-
Notifications
You must be signed in to change notification settings - Fork 187
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Close backend connection when frontend is not found #417
Conversation
Actually, it's possible this was already fixed by #386. Even if that's the case, I think this is the correct way of handling this error, should it ever occur. |
It occured to me that we should probably apply this same change everywhere a data packet is handled (client & agent too). |
705f70e
to
17d661b
Compare
var wg sync.WaitGroup | ||
verify := func() { | ||
defer wg.Done() | ||
|
||
// run test client |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note to reviewers: This test was reusing the tunnel across multiple requests, which we explicitly say not to do. It worked before because the test only sets up a single backend, and the FE request fits in a single data packet. This PR makes the failure more explicit when the connection is reused, which broke this test.
43255e8
to
b86eaa4
Compare
Rebased. /assign @jkh52 |
|
Fixed. I'm not sure why it wasn't failing before. The order isn't supposed to matter on those mocked calls (they're not in an |
@@ -339,7 +339,15 @@ func (t *grpcTunnel) serve(tunnelCtx context.Context) { | |||
conn, ok := t.conns.get(resp.ConnectID) | |||
|
|||
if !ok { | |||
klog.V(1).InfoS("Connection not recognized", "connectionID", resp.ConnectID) | |||
klog.ErrorS(nil, "Connection not recognized", "connectionID", resp.ConnectID) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could be an error but it can also be a race condition. We used to have this as an error and it tended to cause undue concern as it tends to happen when a connection is being shutdown. t.conns missing a reference to connectID usually just indicates that the connection is being shut down but that the other end sent a data packet before it realized the connection was going away.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, good point. Especially with the fixes in this PR, it's pretty unlikely this would keep happening in another scenario. I'll change it back.
Not in this PR, but we may want to consider keeping a cache of recently closed connections. I've thought of a few cases where it would be useful.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 to keep recent connections to understand this case better.
pkg/agent/client.go
Outdated
@@ -501,6 +501,18 @@ func (a *Client) Serve() { | |||
ctx, ok := a.connManager.Get(data.ConnectID) | |||
if ok { | |||
ctx.send(data.Data) | |||
} else { | |||
klog.ErrorS(nil, "received DATA for unrecognized connection", "connectionID", data.ConnectID) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again, a little concerned about having this as an error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
for range recvCh { | ||
// Ignore values as this indicates there was a problem | ||
// with the remote connection. | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add a debug message to let us know how many packets we dropped? Might even be worth adding a metric.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. Also added to the agent version of this.
pkg/server/server.go
Outdated
if firstConnID == 0 { | ||
firstConnID = connID | ||
} else if firstConnID != connID { | ||
klog.V(5).InfoS("Data does not match first connection id", "fistConnectionID", firstConnID, "connectionID", connID) | ||
klog.ErrorS(nil, "Data does not match first connection id", "fistConnectionID", firstConnID, "connectionID", connID) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fist?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we believe we have seen this? If so I would very much like to see a metric added.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed. This could happen if a frontend client was attempting to reuse a tunnel. Agree on the metric, but I'd add it as part of a generic connection_closed
metric with a close_reason
. Basically the post-dial equivalent of the dial_failures
metric: #410
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure even a reuse will cause a new connID. the tunnel sets the connID while the tunnel is being set up.
The connection ID is set by the agent at https://github.com/kubernetes-sigs/apiserver-network-proxy/blob/master/pkg/agent/client.go#L404.
The means it will only be created in response to a dial request which is created by the client at
Type: client.PacketType_DIAL_REQ, |
Within the client that is only called from CreateSingleUseGrpcTunnelWithContext (
func CreateSingleUseGrpcTunnelWithContext(createCtx, tunnelCtx context.Context, address string, opts ...grpc.DialOption) (Tunnel, error) { |
So reusing a connection on a tunnel should not create a new connection id.
pkg/server/server.go
Outdated
@@ -856,7 +856,8 @@ func (s *ProxyServer) serveRecvBackend(backend Backend, stream agent.AgentServic | |||
klog.V(5).InfoS("Received data from agent", "bytes", len(resp.Data), "agentID", agentID, "connectionID", resp.ConnectID) | |||
frontend, err := s.getFrontend(agentID, resp.ConnectID) | |||
if err != nil { | |||
klog.ErrorS(err, "could not get frontend client", "agentID", agentID, "connectionID", resp.ConnectID) | |||
klog.ErrorS(err, "could not get frontend client; closing conenction", "agentID", agentID, "connectionID", resp.ConnectID) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Data point: in a cluster I'm debugging (with a thrashing controller), I see ~20 of this error log, and ~400k of the info level log below (CLOSE_RSP case - "could not get frontend client for closing")
So we may need this in that case too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The close case just means the frontend terminated the connection before the backend responded with the CLOSE_RSP (it could also mean the server initiated the close for some reason). In either case, the backend will have already shut down any connection state after sending a CLOSE_RSP, so no need to respond with a CLOSE_REQ in that case.
That said, it's a good call that in the frontend CLOSE_REQ error cases, we should return a CLOSE_RSP from the server so the client can fail fast. I'll add that as a separate commit.
// As the read side of the recvCh channel, we cannot close it. | ||
// However readFrontendToChannel() may be blocked writing to the channel, | ||
// so we need to consume the channel until it is closed. | ||
for range recvCh { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to add unit test coverage?
What is the bad thing this prevents - leaked goroutine? If so I suggest explicit in this comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Without this, the go-routine that actually receives the packets and queues them up in the channel could be blocked here:
apiserver-network-proxy/pkg/server/server.go
Line 422 in 682e779
case recvCh <- in: // Send didn't block, carry on. |
Recv()
call: apiserver-network-proxy/pkg/server/server.go
Lines 400 to 401 in 682e779
in, err := stream.Recv() | |
if err == io.EOF { |
So yes, without this the go-routines could be deadlocked.
Note, this was copied from the agent client, which does the same thing: https://github.com/kubernetes-sigs/apiserver-network-proxy/blob/master/pkg/agent/client.go#L586-L597
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I realized that serverRecvBackend
also needs the same treatment, and added it there.
EDIT: Actually, this should never happen on the backend side. I'm going to leave the drain there as a safety measure though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I modified TestServerProxyRecvChanFull
to verify that the deadlock condition doesn't occur. It's a bit contrived, but the scenarios where this would happen are when the frontend client is behaving correctly.
Note that getting the test to work required terminating serveRecvFrontend
as soon as the CLOSE_REQ
was sent to the backend. This change fixed the duplicate close packet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think its fine but if we're worried we could adding a draining connection metric. Increment when we enter the defer and decrement when we exit. If this gets stuck calling the recvCh then the metric will stay up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think its fine but if we're worried we could adding a draining connection metric. Increment when we enter the defer and decrement when we exit. If this gets stuck calling the recvCh then the metric will stay up.
IMO: keep the logs for now, and soon add a proxy-server metric for proxy connections labeled by state, similar to the client metric.
@@ -339,7 +339,15 @@ func (t *grpcTunnel) serve(tunnelCtx context.Context) { | |||
conn, ok := t.conns.get(resp.ConnectID) | |||
|
|||
if !ok { | |||
klog.V(1).InfoS("Connection not recognized", "connectionID", resp.ConnectID) | |||
klog.ErrorS(nil, "Connection not recognized", "connectionID", resp.ConnectID) | |||
t.stream.Send(&client.Packet{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ideally we would add ObservePacket() and conditional ObserveStreamError() here.
(See the other 2 stream send in this file, and note that t.stream is the raw stream.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. We may want to consider switching to a wrapper around the ProxyStream client that records the metrics automatically.
pkg/server/server.go
Outdated
@@ -520,16 +530,24 @@ func (s *ProxyServer) serveRecvFrontend(stream client.ProxyService_ProxyServer, | |||
connID := pkt.GetData().ConnectID | |||
data := pkt.GetData().Data | |||
klog.V(5).InfoS("Received data from connection", "bytes", len(data), "connectionID", connID) | |||
if backend == nil { | |||
klog.V(2).InfoS("Backend has not been initialized for the connection. Client should send a Dial Request first", "connectionID", connID) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems more likely the client sent the data packet on the wrong stream than that they attempted to send a data packet before a dial request.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed. This was copied from the log message under the DATA
case, but it's weird in both places. I just removed the second sentence.
/label tide/merge-method-squash |
@@ -339,7 +339,20 @@ func (t *grpcTunnel) serve(tunnelCtx context.Context) { | |||
conn, ok := t.conns.get(resp.ConnectID) | |||
|
|||
if !ok { | |||
klog.V(1).InfoS("Connection not recognized", "connectionID", resp.ConnectID) | |||
klog.ErrorS(nil, "Connection not recognized", "connectionID", resp.ConnectID, "packetType", "DATA") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought we were taking this back to Info?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The connection ID is only removed from t.conns
when receiving a CLOSE_RSP from the server, not for a client-side close. So the only way this would be hit is if the server sent a DATA packet after a CLOSE_RSP, which should never happen.
Note that I did change the CLOSE_RSP version of this log (line 374) to an info log, as there are cases where multiple CLOSE_RSP packets might be sent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I buy that argument (for now) but it suggests that we only clean up the connection map if we get a CLOSE_RSP packet. That seems like it could represent a possible leak. If we decide to fix that possible leak then we may need to revisit this log message.
I really like the drains, good catch. There are a couple of minor issues. Happy to lgtm/approve as soon as they are fixed. |
+1 lots of good improvements here. I'm excited to tag after merging this, and observe agent memory improvement. |
/lgtm |
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: cheftako, tallclair The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
) * [server] Close backend connection when frontend is not found * [server] Handle non-recoverable frontend errors * [agent] Handle unrecognized connections * [agent] clean up typos * [client] Handle unrecognized connection * Fix concurrent_test * Change unknown connection logs to Info level V2 * Log & test drained recv packets * [server] Return a CLOSE_RSP to the frontend when failing the CLOSE_REQ * [client] observe close_req metrics * [server] clean up uninitialized backend log message * [client] Use Send wrapper for metrics observation * Fail fast when a DATA packet is missing a connection ID
I don't actually understand how we're getting into this state, but this log line shows up with some frequency:
This is a non-recoverable condition, indicating that somehow the frontend was disconnected but left the backend connection open. When this happens, reply to the backend with a close request.