diff --git a/src/client.toit b/src/client.toit index d00f156..f8a5477 100644 --- a/src/client.toit +++ b/src/client.toit @@ -3,6 +3,8 @@ // found in the LICENSE file. import log +import net +import tls import .full_client import .session_options import .last_will @@ -22,7 +24,7 @@ class Client: Constructs a new routing MQTT client. The $transport parameter is used to send messages and is usually a TCP socket instance. - See $TcpTransport. + See $(constructor --host), $Client.tls, and $TcpTransport. The $routes must be a map from topic (of type $string) to callback. After the client started, it will automatically subscribe to all topics in the map (with a max-qos of 1). If the broker already @@ -40,6 +42,36 @@ class Client: max_qos := 1 subscription_callbacks_.set topic (CallbackEntry_ callback max_qos) + /** + Variant of $(constructor --transport) that connects to the given $host:$port over TCP. + */ + constructor + --host /string + --port /int = 1883 + --net_open /Lambda? = (:: net.open) + --logger /log.Logger = log.default + --routes /Map = {:}: + transport := TcpTransport --host=host --port=port --net_open=net_open + return Client --transport=transport --logger=logger --routes=routes + + /** + Variant of $(constructor --host) that supports TLS. + */ + constructor.tls + --host /string + --port /int = 8883 + --net_open /Lambda? = (:: net.open) + --root_certificates /List = [] + --server_name /string? = null + --certificate /tls.Certificate? = null + --logger /log.Logger = log.default + --routes /Map = {:}: + transport := TcpTransport.tls --host=host --port=port --net_open=net_open + --root_certificates=root_certificates + --server_name=server_name + --certificate=certificate + return Client --transport=transport --logger=logger --routes=routes + /** Variant of $(start --options). diff --git a/src/full_client.toit b/src/full_client.toit index 31dcd4d..52e39c8 100644 --- a/src/full_client.toit +++ b/src/full_client.toit @@ -198,6 +198,11 @@ interface ReconnectionStrategy: The strategy should first call $send_connect, followed by a $receive_connect_ack. If the connection is unsuccessful, it may retry. + The $disconnect_transport may be called before attempting a reconnect. It shuts down the + network, thus increasing the chance of a successful reconnect. However, a complete + network reconnect takes more time. We recommend to try at least once to reconnect without + a disconnect first, and then call $disconnect_transport for later attempts. + The $receive_connect_ack block returns whether the broker had a session for this client. The $is_initial_connection is true if this is the first time the client connects to the broker. @@ -206,6 +211,7 @@ interface ReconnectionStrategy: transport/ActivityMonitoringTransport --is_initial_connection /bool [--reconnect_transport] + [--disconnect_transport] [--send_connect] [--receive_connect_ack] [--disconnect] @@ -280,6 +286,7 @@ abstract class DefaultReconnectionStrategyBase implements ReconnectionStrategy: do_connect transport/ActivityMonitoringTransport --reuse_connection/bool=false [--reconnect_transport] + [--disconnect_transport] [--send_connect] [--receive_connect_ack]: attempt_counter := -1 @@ -306,6 +313,7 @@ abstract class DefaultReconnectionStrategyBase implements ReconnectionStrategy: sleep_duration = attempt_delays_[attempt_counter - 1] else: sleep_duration = delay_lambda_.call attempt_counter + disconnect_transport.call closed_signal_.wait --timeout=sleep_duration if is_closed: return null logger_.debug "Attempting to (re)connect" @@ -327,6 +335,7 @@ abstract class DefaultReconnectionStrategyBase implements ReconnectionStrategy: transport/ActivityMonitoringTransport --is_initial_connection /bool [--reconnect_transport] + [--disconnect_transport] [--send_connect] [--receive_connect_ack] [--disconnect] @@ -364,6 +373,7 @@ class DefaultCleanSessionReconnectionStrategy extends DefaultReconnectionStrateg transport/ActivityMonitoringTransport --is_initial_connection /bool [--reconnect_transport] + [--disconnect_transport] [--send_connect] [--receive_connect_ack] [--disconnect]: @@ -372,6 +382,7 @@ class DefaultCleanSessionReconnectionStrategy extends DefaultReconnectionStrateg session_exists := do_connect transport --reuse_connection = is_initial_connection --reconnect_transport = reconnect_transport + --disconnect_transport = disconnect_transport --send_connect = send_connect --receive_connect_ack = receive_connect_ack if session_exists: @@ -402,12 +413,14 @@ class DefaultSessionReconnectionStrategy extends DefaultReconnectionStrategyBase transport/ActivityMonitoringTransport --is_initial_connection /bool [--reconnect_transport] + [--disconnect_transport] [--send_connect] [--receive_connect_ack] [--disconnect]: session_exists := do_connect transport --reuse_connection = is_initial_connection --reconnect_transport = reconnect_transport + --disconnect_transport = disconnect_transport --send_connect = send_connect --receive_connect_ack = receive_connect_ack @@ -468,12 +481,14 @@ class TenaciousReconnectionStrategy extends DefaultReconnectionStrategyBase: transport/ActivityMonitoringTransport --is_initial_connection /bool [--reconnect_transport] + [--disconnect_transport] [--send_connect] [--receive_connect_ack] [--disconnect]: session_exists := do_connect transport --reuse_connection = is_initial_connection --reconnect_transport = reconnect_transport + --disconnect_transport = disconnect_transport --send_connect = send_connect --receive_connect_ack = receive_connect_ack @@ -1136,6 +1151,8 @@ class FullClient: --reconnect_transport = : transport_.reconnect connection_ = Connection_ transport_ --keep_alive=session_.options.keep_alive + --disconnect_transport = : + transport_.disconnect --send_connect = : packet := ConnectPacket session_.options.client_id --clean_session=session_.options.clean_session @@ -1186,4 +1203,3 @@ class FullClient: if reconnection_strategy_: reconnection_strategy_.close connection_.close if not handling_latch_.has_value: handling_latch_.set false - diff --git a/src/tcp.toit b/src/tcp.toit index 244199d..adea354 100644 --- a/src/tcp.toit +++ b/src/tcp.toit @@ -25,14 +25,28 @@ class TcpTransport implements Transport BrokerTransport: constructor socket/tcp.Socket: socket_ = socket + /** Deprecated. Use $(constructor --net_open --host) instead. */ constructor network/net.Interface --host/string --port/int=1883: - return ReconnectingTransport_ network --host=host --port=port + return ReconnectingTransport_ network --net_open=null --host=host --port=port + constructor --net_open/Lambda --host/string --port/int=1883: + return ReconnectingTransport_ null --net_open=net_open --host=host --port=port + + /** Deprecated. Use $(TcpTransport.tls --net_open --host) instead. */ constructor.tls network/net.Interface --host/string --port/int=8883 --root_certificates/List=[] --server_name/string?=null --certificate/tls.Certificate?=null: - return ReconnectingTlsTransport_ network --host=host --port=port + return ReconnectingTlsTransport_ network --net_open=null --host=host --port=port + --root_certificates=root_certificates + --server_name=server_name + --certificate=certificate + + constructor.tls --net_open/Lambda --host/string --port/int=8883 + --root_certificates/List=[] + --server_name/string?=null + --certificate/tls.Certificate?=null: + return ReconnectingTlsTransport_ null --net_open=net_open --host=host --port=port --root_certificates=root_certificates --server_name=server_name --certificate=certificate @@ -58,17 +72,24 @@ class TcpTransport implements Transport BrokerTransport: reconnect -> none: throw "UNSUPPORTED" + disconnect -> none: + throw "UNSUPPORTED" + class ReconnectingTransport_ extends TcpTransport: // Reconnection information. - network_ /net.Interface + network_ /net.Interface? := null host_ /string port_ /int + open_ /Lambda? reconnecting_mutex_ /monitor.Mutex := monitor.Mutex - constructor .network_ --host/string --port/int=1883: + constructor .network_ --net_open/Lambda? --host/string --port/int=1883: + if not network_ and not net_open: throw "Either network or net_open must be provided" + if network_ and net_open: throw "Only one of network or net_open must be provided" host_ = host port_ = port + open_ = net_open super.from_subclass_ null reconnect @@ -78,7 +99,15 @@ class ReconnectingTransport_ extends TcpTransport: read -> ByteArray?: return socket_.read + close -> none: + super + // Only close the network if we were the ones who opened it. + if open_ and network_: + network_.close + network_ = null + reconnect: + if not network_: network_ = open_.call old_socket := socket_ reconnecting_mutex_.do: if not identical old_socket socket_: return @@ -105,19 +134,23 @@ class ReconnectingTransport_ extends TcpTransport: supports_reconnect -> bool: return true + disconnect: + if not open_: return + if network_: network_.close + class ReconnectingTlsTransport_ extends ReconnectingTransport_: certificate_ /tls.Certificate? server_name_ /string? root_certificates_ /List - constructor network/net.Interface --host/string --port/int + constructor network/net.Interface? --net_open/Lambda? --host/string --port/int --root_certificates/List=[] --server_name/string?=null --certificate/tls.Certificate?=null: root_certificates_ = root_certificates server_name_ = server_name certificate_ = certificate - super network --host=host --port=port + super network --net_open=net_open --host=host --port=port new_connection_ -> tcp.Socket: socket := network_.tcp_connect host_ port_ diff --git a/src/transport.toit b/src/transport.toit index 22f29fd..46bc421 100644 --- a/src/transport.toit +++ b/src/transport.toit @@ -44,6 +44,16 @@ interface Transport implements reader.Reader: */ reconnect -> none + /** + Disconnects the transport. + + This can be called before doing a $reconnect. It will close the network, if + that's supported. + Disconnecting is not the same as closing. A transport can be disconnected + and still be open. + */ + disconnect -> none + /** Whether the transport is closed. If it $supports_reconnect then calling $reconnect reopens the transport. @@ -99,5 +109,8 @@ class ActivityMonitoringTransport implements Transport: reconnect -> none: wrapped_transport_.reconnect + disconnect -> none: + wrapped_transport_.disconnect + is_closed -> bool: return wrapped_transport_.is_closed diff --git a/tests/broker_mosquitto.toit b/tests/broker_mosquitto.toit index 9b9084c..7b3914b 100644 --- a/tests/broker_mosquitto.toit +++ b/tests/broker_mosquitto.toit @@ -90,7 +90,7 @@ with_mosquitto --logger/log.Logger [block]: sleep --ms=(50*i) try: - block.call:: mqtt.TcpTransport network --host="localhost" --port=port + block.call:: mqtt.TcpTransport --net_open=(:: net.open) --host="localhost" --port=port finally: | is_exception _ | pid := mosquitto_fork_data[3] logger.info "killing mosquitto server" @@ -106,4 +106,4 @@ Can sometimes be useful, as the logging is better. */ with_external_mosquitto --logger/log.Logger [block]: network := net.open - block.call:: mqtt.TcpTransport network --host="localhost" --port=1883 + block.call:: mqtt.TcpTransport --net_open=(:: net.open) --host="localhost" --port=1883 diff --git a/tests/max_inflight_test.toit b/tests/max_inflight_test.toit index f8799fc..93f95e7 100644 --- a/tests/max_inflight_test.toit +++ b/tests/max_inflight_test.toit @@ -19,6 +19,7 @@ class TestTransport implements mqtt.Transport: wrapped_ /mqtt.Transport on_reconnect /Lambda? := null + on_disconnect /Lambda? := null on_write /Lambda? := null on_read /Lambda? := null @@ -40,6 +41,10 @@ class TestTransport implements mqtt.Transport: if on_reconnect: on_reconnect.call wrapped_.reconnect + disconnect -> none: + if on_disconnect: on_disconnect.call + wrapped_.disconnect + is_closed -> bool: return wrapped_.is_closed /** diff --git a/tests/package.lock b/tests/package.lock index 8fbc72b..056f7b1 100644 --- a/tests/package.lock +++ b/tests/package.lock @@ -7,5 +7,6 @@ packages: path: .. pkg-host: url: github.com/toitlang/pkg-host + name: host version: 1.6.0 hash: d05b91390e76c3543a9968b042aed330210bafa4 diff --git a/tests/ping_test.toit b/tests/ping_test.toit index 17bb184..63532a5 100644 --- a/tests/ping_test.toit +++ b/tests/ping_test.toit @@ -66,6 +66,7 @@ class SlowTransport implements mqtt.Transport: close -> none: wrapped_.close supports_reconnect -> bool: return wrapped_.supports_reconnect reconnect -> none: wrapped_.reconnect + disconnect -> none: wrapped_.disconnect is_closed -> bool: return wrapped_.is_closed /** diff --git a/tests/reconnect_close_test.toit b/tests/reconnect_close_test.toit index f2460f5..79ec96e 100644 --- a/tests/reconnect_close_test.toit +++ b/tests/reconnect_close_test.toit @@ -15,40 +15,15 @@ import .broker_mosquitto import .packet_test_client import .transport -class TestTransport implements mqtt.Transport: - wrapped_ /mqtt.Transport - - on_reconnect /Lambda? := null - on_write /Lambda? := null - on_read /Lambda? := null - - constructor .wrapped_: - - write bytes/ByteArray -> int: - if on_write: on_write.call bytes - return wrapped_.write bytes - - read -> ByteArray?: - if on_read: return on_read.call wrapped_ - return wrapped_.read - - close -> none: wrapped_.close - supports_reconnect -> bool: return wrapped_.supports_reconnect - reconnect -> none: - if on_reconnect: on_reconnect.call - wrapped_.reconnect - - is_closed -> bool: return wrapped_.is_closed - /** Tests that the client closes as if it was a forced close if the connection is down. */ test_no_disconnect_packet create_transport/Lambda --logger/log.Logger: - failing_transport /TestTransport? := null + failing_transport /CallbackTestTransport? := null create_failing_transport := :: transport := create_transport.call - failing_transport = TestTransport transport + failing_transport = CallbackTestTransport transport failing_transport // There will be a reconnection attempt immediately when the connection fails. @@ -125,11 +100,11 @@ This is different from $test_no_disconnect_packet, as the client already managed sends a disconnect instead of abruptly closing the connection. */ test_reconnect_before_disconnect_packet create_transport/Lambda --logger/log.Logger: - brittle_transport /TestTransport? := null + brittle_transport /CallbackTestTransport? := null create_brittle_transport := :: transport := create_transport.call - brittle_transport = TestTransport transport + brittle_transport = CallbackTestTransport transport brittle_transport reconnection_strategy := mqtt.DefaultSessionReconnectionStrategy @@ -209,11 +184,11 @@ close_in_handle create_transport/Lambda --logger/log.Logger --force/bool: handle_done.get test_reconnect_after_broker_disconnect create_transport/Lambda --logger/log.Logger: - disconnecting_transport /TestTransport? := null + disconnecting_transport /CallbackTestTransport? := null create_failing_transport := :: transport := create_transport.call - disconnecting_transport = TestTransport transport + disconnecting_transport = CallbackTestTransport transport disconnecting_transport with_packet_client create_failing_transport diff --git a/tests/reconnect_transport_failure_test.toit b/tests/reconnect_transport_failure_test.toit index c07217f..b713794 100644 --- a/tests/reconnect_transport_failure_test.toit +++ b/tests/reconnect_transport_failure_test.toit @@ -25,41 +25,15 @@ main args: run_test := : | create_transport/Lambda | test create_transport --logger=logger with_internal_broker --logger=logger run_test - -class TestTransport implements mqtt.Transport: - wrapped_ /mqtt.Transport - - on_reconnect /Lambda? := null - on_write /Lambda? := null - on_read /Lambda? := null - - constructor .wrapped_: - - write bytes/ByteArray -> int: - if on_write: on_write.call bytes - return wrapped_.write bytes - - read -> ByteArray?: - if on_read: return on_read.call wrapped_ - return wrapped_.read - - close -> none: wrapped_.close - supports_reconnect -> bool: return wrapped_.supports_reconnect - reconnect -> none: - if on_reconnect: on_reconnect.call - wrapped_.reconnect - - is_closed -> bool: return wrapped_.is_closed - /** Tests that the client continues to reconnect if the transport reconnect fails. */ test create_transport/Lambda --logger/log.Logger: - failing_transport /TestTransport? := null + failing_transport /CallbackTestTransport? := null create_failing_transport := :: transport := create_transport.call - failing_transport = TestTransport transport + failing_transport = CallbackTestTransport transport failing_transport reconnection_strategy := mqtt.DefaultSessionReconnectionStrategy diff --git a/tests/reconnection_strategy_tenacious_test.toit b/tests/reconnection_strategy_tenacious_test.toit index 207713e..b2ddf21 100644 --- a/tests/reconnection_strategy_tenacious_test.toit +++ b/tests/reconnection_strategy_tenacious_test.toit @@ -15,33 +15,6 @@ import .broker_mosquitto import .packet_test_client import .transport -class TestTransport implements mqtt.Transport: - wrapped_ /mqtt.Transport - - on_reconnect /Lambda? := null - on_write /Lambda? := null - on_read /Lambda? := null - - constructor .wrapped_: - - write bytes/ByteArray -> int: - if on_write: on_write.call bytes - return wrapped_.write bytes - - read -> ByteArray?: - if on_read: return on_read.call wrapped_ - return wrapped_.read - - close -> none: wrapped_.close - - supports_reconnect -> bool: return wrapped_.supports_reconnect - - reconnect -> none: - if on_reconnect: on_reconnect.call - wrapped_.reconnect - - is_closed -> bool: return wrapped_.is_closed - /** Tests the tenacious reconnection strategy. @@ -49,11 +22,11 @@ The reconnection strategy keeps on trying. The only way to stop it is to close the client. */ test create_transport/Lambda --logger/log.Logger: - failing_transport /TestTransport? := null + failing_transport /CallbackTestTransport? := null create_failing_transport := :: transport := create_transport.call - failing_transport = TestTransport transport + failing_transport = CallbackTestTransport transport failing_transport delay_lambda_client := null // Will be set later. @@ -100,10 +73,16 @@ test create_transport/Lambda --logger/log.Logger: // eventually the packet will be sent. client.publish "eventually_succeeding" #[] --qos=0 - // Allow for 10 reconnection attempts. + was_disconnected := false + failing_transport.on_disconnect = :: + was_disconnected = true + + // Allow for 10 reconnection attempts. 10.repeat: delay_lambda_semaphore.up 10.repeat: delay_lambda_called_semaphore.down + expect was_disconnected + // Switch back to just having the write fail. is_reconnect_failing = false 10.repeat: delay_lambda_semaphore.up diff --git a/tests/transport.toit b/tests/transport.toit index d73121f..1a335f7 100644 --- a/tests/transport.toit +++ b/tests/transport.toit @@ -36,6 +36,8 @@ class TestClientTransport implements mqtt.Transport: reconnect -> none: pipe_ = server_.connect + disconnect -> none: + is_closed -> bool: return pipe_.client_is_closed @@ -261,6 +263,10 @@ class TestTransport implements mqtt.Transport: remaining_to_write_ = 0 start_reading_ + disconnect -> none: + activity_.add [ "disconnect", Time.monotonic_us ] + wrapped_.disconnect + is_closed -> bool: return wrapped_.is_closed @@ -269,3 +275,35 @@ class TestTransport implements mqtt.Transport: activity -> List: return activity_ + +class CallbackTestTransport implements mqtt.Transport: + wrapped_ /mqtt.Transport + + on_reconnect /Lambda? := null + on_disconnect /Lambda? := null + on_write /Lambda? := null + on_read /Lambda? := null + + constructor .wrapped_: + + write bytes/ByteArray -> int: + if on_write: on_write.call bytes + return wrapped_.write bytes + + read -> ByteArray?: + if on_read: return on_read.call wrapped_ + return wrapped_.read + + close -> none: wrapped_.close + + supports_reconnect -> bool: return wrapped_.supports_reconnect + + reconnect -> none: + if on_reconnect: on_reconnect.call + wrapped_.reconnect + + disconnect -> none: + if on_disconnect: on_disconnect.call + wrapped_.disconnect + + is_closed -> bool: return wrapped_.is_closed