Skip to content

Commit

Permalink
Disconnect the network before reconnects. (#77)
Browse files Browse the repository at this point in the history
  • Loading branch information
floitsch authored Sep 27, 2023
1 parent b288d6f commit c38a0a4
Show file tree
Hide file tree
Showing 12 changed files with 166 additions and 99 deletions.
34 changes: 33 additions & 1 deletion src/client.toit
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
// found in the LICENSE file.
import log
import net
import tls
import .full_client
import .session_options
import .last_will
Expand All @@ -22,7 +24,7 @@ class Client:
Constructs a new routing MQTT client.
The $transport parameter is used to send messages and is usually a TCP socket instance.
See $TcpTransport.
See $(constructor --host), $Client.tls, and $TcpTransport.
The $routes must be a map from topic (of type $string) to callback. After the client started, it
will automatically subscribe to all topics in the map (with a max-qos of 1). If the broker already
Expand All @@ -40,6 +42,36 @@ class Client:
max_qos := 1
subscription_callbacks_.set topic (CallbackEntry_ callback max_qos)

/**
Variant of $(constructor --transport) that connects to the given $host:$port over TCP.
*/
constructor
--host /string
--port /int = 1883
--net_open /Lambda? = (:: net.open)
--logger /log.Logger = log.default
--routes /Map = {:}:
transport := TcpTransport --host=host --port=port --net_open=net_open
return Client --transport=transport --logger=logger --routes=routes

/**
Variant of $(constructor --host) that supports TLS.
*/
constructor.tls
--host /string
--port /int = 8883
--net_open /Lambda? = (:: net.open)
--root_certificates /List = []
--server_name /string? = null
--certificate /tls.Certificate? = null
--logger /log.Logger = log.default
--routes /Map = {:}:
transport := TcpTransport.tls --host=host --port=port --net_open=net_open
--root_certificates=root_certificates
--server_name=server_name
--certificate=certificate
return Client --transport=transport --logger=logger --routes=routes

/**
Variant of $(start --options).
Expand Down
18 changes: 17 additions & 1 deletion src/full_client.toit
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,11 @@ interface ReconnectionStrategy:
The strategy should first call $send_connect, followed by a $receive_connect_ack. If
the connection is unsuccessful, it may retry.
The $disconnect_transport may be called before attempting a reconnect. It shuts down the
network, thus increasing the chance of a successful reconnect. However, a complete
network reconnect takes more time. We recommend to try at least once to reconnect without
a disconnect first, and then call $disconnect_transport for later attempts.
The $receive_connect_ack block returns whether the broker had a session for this client.
The $is_initial_connection is true if this is the first time the client connects to the broker.
Expand All @@ -206,6 +211,7 @@ interface ReconnectionStrategy:
transport/ActivityMonitoringTransport
--is_initial_connection /bool
[--reconnect_transport]
[--disconnect_transport]
[--send_connect]
[--receive_connect_ack]
[--disconnect]
Expand Down Expand Up @@ -280,6 +286,7 @@ abstract class DefaultReconnectionStrategyBase implements ReconnectionStrategy:
do_connect transport/ActivityMonitoringTransport
--reuse_connection/bool=false
[--reconnect_transport]
[--disconnect_transport]
[--send_connect]
[--receive_connect_ack]:
attempt_counter := -1
Expand All @@ -306,6 +313,7 @@ abstract class DefaultReconnectionStrategyBase implements ReconnectionStrategy:
sleep_duration = attempt_delays_[attempt_counter - 1]
else:
sleep_duration = delay_lambda_.call attempt_counter
disconnect_transport.call
closed_signal_.wait --timeout=sleep_duration
if is_closed: return null
logger_.debug "Attempting to (re)connect"
Expand All @@ -327,6 +335,7 @@ abstract class DefaultReconnectionStrategyBase implements ReconnectionStrategy:
transport/ActivityMonitoringTransport
--is_initial_connection /bool
[--reconnect_transport]
[--disconnect_transport]
[--send_connect]
[--receive_connect_ack]
[--disconnect]
Expand Down Expand Up @@ -364,6 +373,7 @@ class DefaultCleanSessionReconnectionStrategy extends DefaultReconnectionStrateg
transport/ActivityMonitoringTransport
--is_initial_connection /bool
[--reconnect_transport]
[--disconnect_transport]
[--send_connect]
[--receive_connect_ack]
[--disconnect]:
Expand All @@ -372,6 +382,7 @@ class DefaultCleanSessionReconnectionStrategy extends DefaultReconnectionStrateg
session_exists := do_connect transport
--reuse_connection = is_initial_connection
--reconnect_transport = reconnect_transport
--disconnect_transport = disconnect_transport
--send_connect = send_connect
--receive_connect_ack = receive_connect_ack
if session_exists:
Expand Down Expand Up @@ -402,12 +413,14 @@ class DefaultSessionReconnectionStrategy extends DefaultReconnectionStrategyBase
transport/ActivityMonitoringTransport
--is_initial_connection /bool
[--reconnect_transport]
[--disconnect_transport]
[--send_connect]
[--receive_connect_ack]
[--disconnect]:
session_exists := do_connect transport
--reuse_connection = is_initial_connection
--reconnect_transport = reconnect_transport
--disconnect_transport = disconnect_transport
--send_connect = send_connect
--receive_connect_ack = receive_connect_ack

Expand Down Expand Up @@ -468,12 +481,14 @@ class TenaciousReconnectionStrategy extends DefaultReconnectionStrategyBase:
transport/ActivityMonitoringTransport
--is_initial_connection /bool
[--reconnect_transport]
[--disconnect_transport]
[--send_connect]
[--receive_connect_ack]
[--disconnect]:
session_exists := do_connect transport
--reuse_connection = is_initial_connection
--reconnect_transport = reconnect_transport
--disconnect_transport = disconnect_transport
--send_connect = send_connect
--receive_connect_ack = receive_connect_ack

Expand Down Expand Up @@ -1136,6 +1151,8 @@ class FullClient:
--reconnect_transport = :
transport_.reconnect
connection_ = Connection_ transport_ --keep_alive=session_.options.keep_alive
--disconnect_transport = :
transport_.disconnect
--send_connect = :
packet := ConnectPacket session_.options.client_id
--clean_session=session_.options.clean_session
Expand Down Expand Up @@ -1186,4 +1203,3 @@ class FullClient:
if reconnection_strategy_: reconnection_strategy_.close
connection_.close
if not handling_latch_.has_value: handling_latch_.set false

45 changes: 39 additions & 6 deletions src/tcp.toit
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,28 @@ class TcpTransport implements Transport BrokerTransport:
constructor socket/tcp.Socket:
socket_ = socket

/** Deprecated. Use $(constructor --net_open --host) instead. */
constructor network/net.Interface --host/string --port/int=1883:
return ReconnectingTransport_ network --host=host --port=port
return ReconnectingTransport_ network --net_open=null --host=host --port=port

constructor --net_open/Lambda --host/string --port/int=1883:
return ReconnectingTransport_ null --net_open=net_open --host=host --port=port

/** Deprecated. Use $(TcpTransport.tls --net_open --host) instead. */
constructor.tls network/net.Interface --host/string --port/int=8883
--root_certificates/List=[]
--server_name/string?=null
--certificate/tls.Certificate?=null:
return ReconnectingTlsTransport_ network --host=host --port=port
return ReconnectingTlsTransport_ network --net_open=null --host=host --port=port
--root_certificates=root_certificates
--server_name=server_name
--certificate=certificate

constructor.tls --net_open/Lambda --host/string --port/int=8883
--root_certificates/List=[]
--server_name/string?=null
--certificate/tls.Certificate?=null:
return ReconnectingTlsTransport_ null --net_open=net_open --host=host --port=port
--root_certificates=root_certificates
--server_name=server_name
--certificate=certificate
Expand All @@ -58,17 +72,24 @@ class TcpTransport implements Transport BrokerTransport:
reconnect -> none:
throw "UNSUPPORTED"

disconnect -> none:
throw "UNSUPPORTED"

class ReconnectingTransport_ extends TcpTransport:
// Reconnection information.
network_ /net.Interface
network_ /net.Interface? := null
host_ /string
port_ /int
open_ /Lambda?

reconnecting_mutex_ /monitor.Mutex := monitor.Mutex

constructor .network_ --host/string --port/int=1883:
constructor .network_ --net_open/Lambda? --host/string --port/int=1883:
if not network_ and not net_open: throw "Either network or net_open must be provided"
if network_ and net_open: throw "Only one of network or net_open must be provided"
host_ = host
port_ = port
open_ = net_open
super.from_subclass_ null
reconnect

Expand All @@ -78,7 +99,15 @@ class ReconnectingTransport_ extends TcpTransport:
read -> ByteArray?:
return socket_.read

close -> none:
super
// Only close the network if we were the ones who opened it.
if open_ and network_:
network_.close
network_ = null

reconnect:
if not network_: network_ = open_.call
old_socket := socket_
reconnecting_mutex_.do:
if not identical old_socket socket_: return
Expand All @@ -105,19 +134,23 @@ class ReconnectingTransport_ extends TcpTransport:
supports_reconnect -> bool:
return true

disconnect:
if not open_: return
if network_: network_.close

class ReconnectingTlsTransport_ extends ReconnectingTransport_:
certificate_ /tls.Certificate?
server_name_ /string?
root_certificates_ /List

constructor network/net.Interface --host/string --port/int
constructor network/net.Interface? --net_open/Lambda? --host/string --port/int
--root_certificates/List=[]
--server_name/string?=null
--certificate/tls.Certificate?=null:
root_certificates_ = root_certificates
server_name_ = server_name
certificate_ = certificate
super network --host=host --port=port
super network --net_open=net_open --host=host --port=port

new_connection_ -> tcp.Socket:
socket := network_.tcp_connect host_ port_
Expand Down
13 changes: 13 additions & 0 deletions src/transport.toit
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,16 @@ interface Transport implements reader.Reader:
*/
reconnect -> none

/**
Disconnects the transport.
This can be called before doing a $reconnect. It will close the network, if
that's supported.
Disconnecting is not the same as closing. A transport can be disconnected
and still be open.
*/
disconnect -> none

/**
Whether the transport is closed.
If it $supports_reconnect then calling $reconnect reopens the transport.
Expand Down Expand Up @@ -99,5 +109,8 @@ class ActivityMonitoringTransport implements Transport:
reconnect -> none:
wrapped_transport_.reconnect

disconnect -> none:
wrapped_transport_.disconnect

is_closed -> bool:
return wrapped_transport_.is_closed
4 changes: 2 additions & 2 deletions tests/broker_mosquitto.toit
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ with_mosquitto --logger/log.Logger [block]:
sleep --ms=(50*i)

try:
block.call:: mqtt.TcpTransport network --host="localhost" --port=port
block.call:: mqtt.TcpTransport --net_open=(:: net.open) --host="localhost" --port=port
finally: | is_exception _ |
pid := mosquitto_fork_data[3]
logger.info "killing mosquitto server"
Expand All @@ -106,4 +106,4 @@ Can sometimes be useful, as the logging is better.
*/
with_external_mosquitto --logger/log.Logger [block]:
network := net.open
block.call:: mqtt.TcpTransport network --host="localhost" --port=1883
block.call:: mqtt.TcpTransport --net_open=(:: net.open) --host="localhost" --port=1883
5 changes: 5 additions & 0 deletions tests/max_inflight_test.toit
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class TestTransport implements mqtt.Transport:
wrapped_ /mqtt.Transport

on_reconnect /Lambda? := null
on_disconnect /Lambda? := null
on_write /Lambda? := null
on_read /Lambda? := null

Expand All @@ -40,6 +41,10 @@ class TestTransport implements mqtt.Transport:
if on_reconnect: on_reconnect.call
wrapped_.reconnect

disconnect -> none:
if on_disconnect: on_disconnect.call
wrapped_.disconnect

is_closed -> bool: return wrapped_.is_closed

/**
Expand Down
1 change: 1 addition & 0 deletions tests/package.lock
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,6 @@ packages:
path: ..
pkg-host:
url: github.com/toitlang/pkg-host
name: host
version: 1.6.0
hash: d05b91390e76c3543a9968b042aed330210bafa4
1 change: 1 addition & 0 deletions tests/ping_test.toit
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ class SlowTransport implements mqtt.Transport:
close -> none: wrapped_.close
supports_reconnect -> bool: return wrapped_.supports_reconnect
reconnect -> none: wrapped_.reconnect
disconnect -> none: wrapped_.disconnect
is_closed -> bool: return wrapped_.is_closed

/**
Expand Down
Loading

0 comments on commit c38a0a4

Please sign in to comment.