Skip to content

Commit 9e1c6a0

Browse files
committed
connectd: simplify logic, and add a "reconnected" message.
One issue we have in CI is reconnection races: if an incoming connection arrives while an outgoing one is negotiated, we close the outgoing one and issue a disconnect, which fails any connect attempts. By sending a "reconnected" message instead of disconnect/connect we can avoid disturbing in-progress connection attempts which happens in CI quite a bit. Signed-off-by: Rusty Russell <[email protected]>
1 parent 44cdf61 commit 9e1c6a0

File tree

13 files changed

+255
-171
lines changed

13 files changed

+255
-171
lines changed

connectd/connectd.c

Lines changed: 55 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -100,31 +100,11 @@ static struct connecting *find_connecting(struct daemon *daemon,
100100
return connecting_htable_get(daemon->connecting, id);
101101
}
102102

103-
/*~ When we free a peer, we remove it from the daemon's hashtable.
104-
* We also call this manually if we want to elegantly drain peer's
105-
* queues. */
106-
void destroy_peer(struct peer *peer)
103+
/*~ When we free a peer, we remove it from the daemon's hashtable. */
104+
static void destroy_peer(struct peer *peer)
107105
{
108-
assert(!peer->draining);
109-
110106
if (!peer_htable_del(peer->daemon->peers, peer))
111107
abort();
112-
113-
/* Tell gossipd to stop asking this peer gossip queries */
114-
daemon_conn_send(peer->daemon->gossipd,
115-
take(towire_gossipd_peer_gone(NULL, &peer->id)));
116-
117-
/* Tell lightningd it's really disconnected */
118-
daemon_conn_send(peer->daemon->master,
119-
take(towire_connectd_peer_disconnect_done(NULL,
120-
&peer->id,
121-
peer->counter)));
122-
/* This makes multiplex.c routines not feed us more, but
123-
* *also* means that if we're freed directly, the ->to_peer
124-
* destructor won't call drain_peer(). */
125-
peer->draining = true;
126-
127-
schedule_reconnect_if_important(peer->daemon, &peer->id);
128108
}
129109

130110
/*~ This is where we create a new peer. */
@@ -277,6 +257,22 @@ static void reset_reconnect_timer(struct peer *peer)
277257
imp->reconnect_secs = INITIAL_WAIT_SECONDS;
278258
}
279259

260+
void send_disconnected(struct daemon *daemon,
261+
const struct node_id *id,
262+
u64 connectd_counter)
263+
{
264+
/* lightningd: it's gone */
265+
daemon_conn_send(daemon->master,
266+
take(towire_connectd_peer_disconnected(NULL, id, connectd_counter)));
267+
268+
/* Tell gossipd to stop asking this peer gossip queries */
269+
daemon_conn_send(daemon->gossipd,
270+
take(towire_gossipd_peer_gone(NULL, id)));
271+
272+
/* Start reconnection process if we care */
273+
schedule_reconnect_if_important(daemon, id);
274+
}
275+
280276
/*~ Note the lack of static: this is called by peer_exchange_initmsg.c once the
281277
* INIT messages are exchanged, and also by the retry code above. */
282278
struct io_plan *peer_connected(struct io_conn *conn,
@@ -290,17 +286,20 @@ struct io_plan *peer_connected(struct io_conn *conn,
290286
bool incoming)
291287
{
292288
u8 *msg;
293-
struct peer *peer;
289+
struct peer *peer, *oldpeer;
294290
int unsup;
295291
size_t depender, missing;
296292
int subd_fd;
297293
bool option_gossip_queries;
298294
struct connecting *connect;
295+
u64 prev_connectd_counter;
299296

300297
/* We remove any previous connection immediately, on the assumption it's dead */
301-
peer = peer_htable_get(daemon->peers, id);
302-
if (peer)
303-
tal_free(peer);
298+
oldpeer = peer_htable_get(daemon->peers, id);
299+
if (oldpeer) {
300+
prev_connectd_counter = oldpeer->counter;
301+
destroy_peer_immediately(oldpeer);
302+
}
304303

305304
/* We promised we'd take it by marking it TAKEN above; prepare to free it. */
306305
if (taken(their_features))
@@ -318,6 +317,9 @@ struct io_plan *peer_connected(struct io_conn *conn,
318317
unsup = features_unsupported(daemon->our_features, their_features,
319318
INIT_FEATURE);
320319
if (unsup != -1) {
320+
/* We were going to send a reconnect message, but not now! */
321+
if (oldpeer)
322+
send_disconnected(daemon, id, prev_connectd_counter);
321323
status_peer_unusual(id, "Unsupported feature %u", unsup);
322324
msg = towire_warningfmt(NULL, NULL, "Unsupported feature %u",
323325
unsup);
@@ -326,6 +328,9 @@ struct io_plan *peer_connected(struct io_conn *conn,
326328
}
327329

328330
if (!feature_check_depends(their_features, &depender, &missing)) {
331+
/* We were going to send a reconnect message, but not now! */
332+
if (oldpeer)
333+
send_disconnected(daemon, id, prev_connectd_counter);
329334
status_peer_unusual(id, "Feature %zu requires feature %zu",
330335
depender, missing);
331336
msg = towire_warningfmt(NULL, NULL,
@@ -362,23 +367,37 @@ struct io_plan *peer_connected(struct io_conn *conn,
362367
peer = new_peer(daemon, id, cs, their_features, is_websocket, conn,
363368
&subd_fd);
364369
/* Only takes over conn if it succeeds. */
365-
if (!peer)
370+
if (!peer) {
371+
/* We were going to send a reconnect message, but not now! */
372+
if (oldpeer)
373+
send_disconnected(daemon, id, prev_connectd_counter);
366374
return io_close(conn);
375+
}
367376

368377
/* Tell gossipd it can ask query this new peer for gossip */
369378
option_gossip_queries = feature_negotiated(daemon->our_features,
370379
their_features,
371380
OPT_GOSSIP_QUERIES);
372-
msg = towire_gossipd_new_peer(NULL, id, option_gossip_queries);
373-
daemon_conn_send(daemon->gossipd, take(msg));
374381

375382
/* Get ready for streaming gossip from the store */
376383
setup_peer_gossip_store(peer, daemon->our_features, their_features);
377384

378-
/* Create message to tell master peer has connected. */
379-
msg = towire_connectd_peer_connected(NULL, id, peer->counter,
380-
addr, remote_addr,
381-
incoming, their_features);
385+
/* Create message to tell master peer has connected/reconnected. */
386+
if (oldpeer) {
387+
msg = towire_connectd_peer_reconnected(NULL, id,
388+
prev_connectd_counter,
389+
peer->counter,
390+
addr, remote_addr,
391+
incoming, their_features);
392+
} else {
393+
/* Tell gossipd about new peer */
394+
msg = towire_gossipd_new_peer(NULL, id, option_gossip_queries);
395+
daemon_conn_send(daemon->gossipd, take(msg));
396+
397+
msg = towire_connectd_peer_connected(NULL, id, peer->counter,
398+
addr, remote_addr,
399+
incoming, their_features);
400+
}
382401

383402
/*~ daemon_conn is a message queue for inter-daemon communication: we
384403
* queue up the `connect_peer_connected` message to tell lightningd
@@ -1923,9 +1942,7 @@ static void peer_disconnect(struct daemon *daemon, const u8 *msg)
19231942
if (peer->counter != counter)
19241943
return;
19251944

1926-
/* We make sure any final messages from the subds are sent! */
1927-
status_peer_debug(&id, "disconnect_peer");
1928-
drain_peer(peer);
1945+
disconnect_peer(peer);
19291946
}
19301947

19311948
/* lightningd tells us a peer is no longer "important". */
@@ -2371,7 +2388,8 @@ static struct io_plan *recv_req(struct io_conn *conn,
23712388
case WIRE_CONNECTD_PING_DONE:
23722389
case WIRE_CONNECTD_GOT_ONIONMSG_TO_US:
23732390
case WIRE_CONNECTD_CUSTOMMSG_IN:
2374-
case WIRE_CONNECTD_PEER_DISCONNECT_DONE:
2391+
case WIRE_CONNECTD_PEER_DISCONNECTED:
2392+
case WIRE_CONNECTD_PEER_RECONNECTED:
23752393
case WIRE_CONNECTD_START_SHUTDOWN_REPLY:
23762394
case WIRE_CONNECTD_INJECT_ONIONMSG_REPLY:
23772395
case WIRE_CONNECTD_ONIONMSG_FORWARD_FAIL:

connectd/connectd.h

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -60,15 +60,15 @@ struct peer {
6060
/* Counters and keys for symmetric crypto */
6161
struct crypto_state cs;
6262

63-
/* Connection to the peer */
63+
/* Connection to the peer (NULL if it's disconnected and we're flushing) */
6464
struct io_conn *to_peer;
6565

66-
/* Counter to distinguish this connection from the next re-connection */
67-
u64 counter;
68-
6966
/* Is this draining? If so, just keep writing until queue empty */
7067
bool draining;
7168

69+
/* Counter to distinguish this connection from the next re-connection */
70+
u64 counter;
71+
7272
/* Connections to the subdaemons */
7373
struct subd **subds;
7474

@@ -376,8 +376,13 @@ struct io_plan *peer_connected(struct io_conn *conn,
376376
enum is_websocket is_websocket,
377377
bool incoming);
378378

379-
/* Removes peer from hash table, tells gossipd and lightningd. */
380-
void destroy_peer(struct peer *peer);
379+
/* Tell gossipd and lightningd that this peer is gone. */
380+
void send_disconnected(struct daemon *daemon,
381+
const struct node_id *id,
382+
u64 connectd_counter);
383+
384+
/* Free peer immediately (don't wait for draining). */
385+
void destroy_peer_immediately(struct peer *peer);
381386

382387
/* Remove a random connection, when under stress. */
383388
void close_random_connection(struct daemon *daemon);

connectd/connectd_wire.csv

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -84,9 +84,20 @@ msgdata,connectd_peer_connected,flen,u16,
8484
msgdata,connectd_peer_connected,features,u8,flen
8585

8686
# connectd -> master: peer disconnected.
87-
msgtype,connectd_peer_disconnect_done,2006
88-
msgdata,connectd_peer_disconnect_done,id,node_id,
89-
msgdata,connectd_peer_disconnect_done,counter,u64,
87+
msgtype,connectd_peer_disconnected,2006
88+
msgdata,connectd_peer_disconnected,id,node_id,
89+
msgdata,connectd_peer_disconnected,counter,u64,
90+
91+
# Connectd -> master: peer reconnected (disconnect & connect)
92+
msgtype,connectd_peer_reconnected,2010
93+
msgdata,connectd_peer_reconnected,id,node_id,
94+
msgdata,connectd_peer_reconnected,prev_counter,u64,
95+
msgdata,connectd_peer_reconnected,counter,u64,
96+
msgdata,connectd_peer_reconnected,addr,wireaddr_internal,
97+
msgdata,connectd_peer_reconnected,remote_addr,?wireaddr,
98+
msgdata,connectd_peer_reconnected,incoming,bool,
99+
msgdata,connectd_peer_reconnected,flen,u16,
100+
msgdata,connectd_peer_reconnected,features,u8,flen
90101

91102
# Master -> connectd: make peer active immediately (we want to talk) (+ fd to subd).
92103
msgtype,connectd_peer_connect_subd,2004

0 commit comments

Comments
 (0)