Skip to content

Commit f96634f

Browse files
committed
connectd: fix race where last msg can still get lost.
openingd sends an ERROR, and exits. lightningd tells us to disconnect. We read from lightningd first, and don't read from openingd. We need to drain subds when we're told to disconnect.
1 parent aed48c0 commit f96634f

File tree

3 files changed

+59
-20
lines changed

3 files changed

+59
-20
lines changed

connectd/connectd.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ static struct peer *new_peer(struct daemon *daemon,
126126
peer->peer_in = NULL;
127127
peer->sent_to_peer = NULL;
128128
peer->urgent = false;
129-
peer->draining = false;
129+
peer->draining_state = NOT_DRAINING;
130130
peer->peer_in_lastmsg = -1;
131131
peer->peer_outq = msg_queue_new(peer, false);
132132
peer->last_recv_time = time_now();

connectd/connectd.h

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,15 @@ enum pong_expect_type {
4646
PONG_EXPECTED_PROBING = 2,
4747
};
4848

49+
enum draining_state {
50+
/* Normal state */
51+
NOT_DRAINING,
52+
/* First, reading remaining messages from subds */
53+
READING_FROM_SUBDS,
54+
/* Finally, writing any queued messages to peer */
55+
WRITING_TO_PEER,
56+
};
57+
4958
/*~ We keep a hash table (ccan/htable) of peers, which tells us what peers are
5059
* already connected (by peer->id). */
5160
struct peer {
@@ -63,8 +72,8 @@ struct peer {
6372
/* Connection to the peer (NULL if it's disconnected and we're flushing) */
6473
struct io_conn *to_peer;
6574

66-
/* Is this draining? If so, just keep writing until queue empty */
67-
bool draining;
75+
/* Non-zero if shutting down. */
76+
enum draining_state draining_state;
6877

6978
/* Counter to distinguish this connection from the next re-connection */
7079
u64 counter;

connectd/multiplex.c

Lines changed: 47 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ static void close_peer_io_timeout(struct peer *peer)
103103

104104
static void close_subd_timeout(struct subd *subd)
105105
{
106-
status_peer_debug(&subd->peer->id, "Subd did not close, forcing close");
106+
status_peer_broken(&subd->peer->id, "Subd did not close, forcing close");
107107
io_close(subd->conn);
108108
}
109109

@@ -118,7 +118,7 @@ static void drain_peer(struct peer *peer)
118118
assert(tal_count(peer->subds) == 0);
119119

120120
/* You have five seconds to drain. */
121-
peer->draining = true;
121+
peer->draining_state = WRITING_TO_PEER;
122122
status_peer_debug(&peer->id, "disconnect_peer: draining with 5 second timer.");
123123
notleak(new_reltimer(&peer->daemon->timers,
124124
peer->to_peer, time_from_sec(5),
@@ -131,19 +131,33 @@ static void drain_peer(struct peer *peer)
131131

132132
void disconnect_peer(struct peer *peer)
133133
{
134-
/* Free all the subds immediately */
134+
peer->draining_state = READING_FROM_SUBDS;
135+
135136
for (size_t i = 0; i < tal_count(peer->subds); i++) {
136-
/* Once conn exists, subd is a child of the conn. Free conn, free subd. */
137+
/* Start timer in case it doesn't close by itself */
137138
if (peer->subds[i]->conn) {
138-
tal_del_destructor(peer->subds[i], destroy_connected_subd);
139-
tal_free(peer->subds[i]->conn);
139+
status_peer_debug(&peer->id, "disconnect_peer: setting 5 second timer for subd %zu/%zu.",
140+
i, tal_count(peer->subds));
141+
notleak(new_reltimer(&peer->daemon->timers, peer->subds[i],
142+
time_from_sec(5),
143+
close_subd_timeout, peer->subds[i]));
140144
} else {
141145
/* We told lightningd that peer spoke, but it hasn't returned yet. */
142-
tal_free(peer->subds[i]);
146+
tal_arr_remove(&peer->subds, i);
147+
i--;
143148
}
144149
}
145-
tal_resize(&peer->subds, 0);
146-
drain_peer(peer);
150+
151+
if (tal_count(peer->subds) != 0) {
152+
status_peer_debug(&peer->id, "disconnect_peer: waking %zu subds.",
153+
tal_count(peer->subds));
154+
/* Wake them up so we read again */
155+
io_wake(&peer->subds);
156+
} else {
157+
status_peer_debug(&peer->id, "disconnect_peer: no subds, draining now.");
158+
/* No subds left, start draining peer */
159+
drain_peer(peer);
160+
}
147161
}
148162

149163
/* Send warning, close connection to peer */
@@ -162,6 +176,18 @@ static void send_warning(struct peer *peer, const char *fmt, ...)
162176

163177
inject_peer_msg(peer, take(msg));
164178

179+
/* Free all the subds immediately */
180+
for (size_t i = 0; i < tal_count(peer->subds); i++) {
181+
/* Once conn exists, subd is a child of the conn. Free conn, free subd. */
182+
if (peer->subds[i]->conn) {
183+
tal_del_destructor(peer->subds[i], destroy_connected_subd);
184+
tal_free(peer->subds[i]->conn);
185+
} else {
186+
/* We told lightningd that peer spoke, but it hasn't returned yet. */
187+
tal_free(peer->subds[i]);
188+
}
189+
}
190+
tal_resize(&peer->subds, 0);
165191
disconnect_peer(peer);
166192
}
167193

@@ -1061,7 +1087,7 @@ static struct io_plan *write_to_peer(struct io_conn *peer_conn,
10611087
/* Still nothing to send? */
10621088
if (!msg) {
10631089
/* Draining? Shutdown socket (to avoid losing msgs) */
1064-
if (peer->draining) {
1090+
if (peer->draining_state == WRITING_TO_PEER) {
10651091
status_peer_debug(&peer->id, "draining done, shutting down");
10661092
io_wake(&peer->peer_in);
10671093
return io_sock_shutdown(peer_conn);
@@ -1080,7 +1106,7 @@ static struct io_plan *write_to_peer(struct io_conn *peer_conn,
10801106
}
10811107
}
10821108

1083-
if (peer->draining)
1109+
if (peer->draining_state == WRITING_TO_PEER)
10841110
status_peer_debug(&peer->id, "draining, but sending %s.",
10851111
peer_wire_name(fromwire_peektype(msg)));
10861112

@@ -1170,10 +1196,14 @@ static void destroy_connected_subd(struct subd *subd)
11701196
* have been waiting for write_to_subd) */
11711197
io_wake(&peer->peer_in);
11721198

1173-
/* If neither peer nor subds, we're done */
1174-
if (tal_count(peer->subds) == 0 && !peer->to_peer) {
1175-
tal_free(peer);
1176-
return;
1199+
if (tal_count(peer->subds) == 0) {
1200+
if (!peer->to_peer) {
1201+
/* Nothing left */
1202+
tal_free(peer);
1203+
} else if (peer->draining_state == READING_FROM_SUBDS) {
1204+
/* We've finished draining subds, start draining peer */
1205+
drain_peer(peer);
1206+
}
11771207
}
11781208
}
11791209

@@ -1242,7 +1272,7 @@ static struct io_plan *read_body_from_peer_done(struct io_conn *peer_conn,
12421272
peer->last_recv_time = time_now();
12431273

12441274
/* Don't process packets while we're closing */
1245-
if (peer->draining)
1275+
if (peer->draining_state != NOT_DRAINING)
12461276
return next_read(peer_conn, peer);
12471277

12481278
/* If we swallow this, just try again. */
@@ -1464,7 +1494,7 @@ void peer_connect_subd(struct daemon *daemon, const u8 *msg, int fd)
14641494
}
14651495

14661496
/* Could be disconnecting now */
1467-
if (!peer->to_peer || peer->draining) {
1497+
if (!peer->to_peer || peer->draining_state != NOT_DRAINING) {
14681498
close(fd);
14691499
return;
14701500
}

0 commit comments

Comments
 (0)