From 877959bc15be9745535835bf22594a1cdbda81cd Mon Sep 17 00:00:00 2001 From: Mathias Fredriksson Date: Thu, 2 Jun 2022 15:32:41 +0300 Subject: [PATCH 1/6] fix: Fix goroutine leak by propagating websocket closure Fixes #1508 --- coderd/workspaceagents.go | 60 +++++++++++++++++++++++++++++++++------ 1 file changed, 52 insertions(+), 8 deletions(-) diff --git a/coderd/workspaceagents.go b/coderd/workspaceagents.go index 90396235993cb..7b29577cd9eba 100644 --- a/coderd/workspaceagents.go +++ b/coderd/workspaceagents.go @@ -1,8 +1,10 @@ package coderd import ( + "context" "database/sql" "encoding/json" + "errors" "fmt" "io" "net" @@ -16,6 +18,7 @@ import ( "nhooyr.io/websocket" "cdr.dev/slog" + "github.com/coder/coder/agent" "github.com/coder/coder/coderd/database" "github.com/coder/coder/coderd/httpapi" @@ -324,16 +327,16 @@ func (api *API) workspaceAgentTurn(rw http.ResponseWriter, r *http.Request) { }) return } - defer func() { - _ = wsConn.Close(websocket.StatusNormalClosure, "") - }() - netConn := websocket.NetConn(r.Context(), wsConn, websocket.MessageBinary) - api.Logger.Debug(r.Context(), "accepting turn connection", slog.F("remote-address", r.RemoteAddr), slog.F("local-address", localAddress)) + + ctx, wsNetConn := websocketNetConn(r.Context(), wsConn, websocket.MessageBinary) + defer wsNetConn.Close() // Also closes conn. + + api.Logger.Debug(ctx, "accepting turn connection", slog.F("remote-address", r.RemoteAddr), slog.F("local-address", localAddress)) select { - case <-api.TURNServer.Accept(netConn, remoteAddress, localAddress).Closed(): - case <-r.Context().Done(): + case <-api.TURNServer.Accept(wsNetConn, remoteAddress, localAddress).Closed(): + case <-ctx.Done(): } - api.Logger.Debug(r.Context(), "completed turn connection", slog.F("remote-address", r.RemoteAddr), slog.F("local-address", localAddress)) + api.Logger.Debug(ctx, "completed turn connection", slog.F("remote-address", r.RemoteAddr), slog.F("local-address", localAddress)) } // workspaceAgentPTY spawns a PTY and pipes it over a WebSocket. @@ -515,3 +518,44 @@ func convertWorkspaceAgent(dbAgent database.WorkspaceAgent, agentUpdateFrequency return workspaceAgent, nil } + +// wsNetConn wraps net.Conn created by websocket.NetConn(). Cancel func +// is called if io.EOF is encountered. +type wsNetConn struct { + cancel context.CancelFunc + net.Conn +} + +func (c *wsNetConn) Read(b []byte) (n int, err error) { + n, err = c.Conn.Read(b) + if errors.Is(err, io.EOF) { + c.cancel() + } + return n, err +} + +func (c *wsNetConn) Write(b []byte) (n int, err error) { + n, err = c.Conn.Write(b) + if errors.Is(err, io.EOF) { + c.cancel() + } + return n, err +} + +func (c *wsNetConn) Close() error { + defer c.cancel() + return c.Conn.Close() +} + +// websocketNetConn wraps websocket.NetConn and returns a context that +// is tied to the parent context and the lifetime of the conn. A io.EOF +// error during read or write will cancel the context, but not close the +// conn. Close should be called to release context resources. +func websocketNetConn(ctx context.Context, conn *websocket.Conn, msgType websocket.MessageType) (context.Context, net.Conn) { + ctx, cancel := context.WithCancel(ctx) + nc := websocket.NetConn(ctx, conn, msgType) + return ctx, &wsNetConn{ + cancel: cancel, + Conn: nc, + } +} From 0a354b6c63b38b0b9b0ec7709c36687fe2aff1bd Mon Sep 17 00:00:00 2001 From: Mathias Fredriksson Date: Thu, 2 Jun 2022 15:35:42 +0300 Subject: [PATCH 2/6] fix: Use of r.Context() in workspaceAgentDial --- coderd/workspaceagents.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/coderd/workspaceagents.go b/coderd/workspaceagents.go index 7b29577cd9eba..6f1e610fc046d 100644 --- a/coderd/workspaceagents.go +++ b/coderd/workspaceagents.go @@ -72,17 +72,18 @@ func (api *API) workspaceAgentDial(rw http.ResponseWriter, r *http.Request) { }) return } - defer func() { - _ = conn.Close(websocket.StatusNormalClosure, "") - }() + + ctx, wsNetConn := websocketNetConn(r.Context(), conn, websocket.MessageBinary) + defer wsNetConn.Close() // Also closes conn. + config := yamux.DefaultConfig() config.LogOutput = io.Discard - session, err := yamux.Server(websocket.NetConn(r.Context(), conn, websocket.MessageBinary), config) + session, err := yamux.Server(wsNetConn, config) if err != nil { _ = conn.Close(websocket.StatusAbnormalClosure, err.Error()) return } - err = peerbroker.ProxyListen(r.Context(), session, peerbroker.ProxyOptions{ + err = peerbroker.ProxyListen(ctx, session, peerbroker.ProxyOptions{ ChannelID: workspaceAgent.ID.String(), Logger: api.Logger.Named("peerbroker-proxy-dial"), Pubsub: api.Pubsub, From e0fb30ca0ee1ea5b9f076ac736a41636734c02d2 Mon Sep 17 00:00:00 2001 From: Mathias Fredriksson Date: Thu, 2 Jun 2022 15:37:04 +0300 Subject: [PATCH 3/6] fix: Use of rw and r.Context() in workspaceAgentListen --- coderd/workspaceagents.go | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/coderd/workspaceagents.go b/coderd/workspaceagents.go index 6f1e610fc046d..b5e8ca1bcb4a2 100644 --- a/coderd/workspaceagents.go +++ b/coderd/workspaceagents.go @@ -197,13 +197,12 @@ func (api *API) workspaceAgentListen(rw http.ResponseWriter, r *http.Request) { return } - defer func() { - _ = conn.Close(websocket.StatusNormalClosure, "") - }() + ctx, wsNetConn := websocketNetConn(r.Context(), conn, websocket.MessageBinary) + defer wsNetConn.Close() // Also closes conn. config := yamux.DefaultConfig() config.LogOutput = io.Discard - session, err := yamux.Server(websocket.NetConn(r.Context(), conn, websocket.MessageBinary), config) + session, err := yamux.Server(wsNetConn, config) if err != nil { _ = conn.Close(websocket.StatusAbnormalClosure, err.Error()) return @@ -233,7 +232,7 @@ func (api *API) workspaceAgentListen(rw http.ResponseWriter, r *http.Request) { } disconnectedAt := workspaceAgent.DisconnectedAt updateConnectionTimes := func() error { - err = api.Database.UpdateWorkspaceAgentConnectionByID(r.Context(), database.UpdateWorkspaceAgentConnectionByIDParams{ + err = api.Database.UpdateWorkspaceAgentConnectionByID(ctx, database.UpdateWorkspaceAgentConnectionByIDParams{ ID: workspaceAgent.ID, FirstConnectedAt: firstConnectedAt, LastConnectedAt: lastConnectedAt, @@ -259,7 +258,7 @@ func (api *API) workspaceAgentListen(rw http.ResponseWriter, r *http.Request) { return } - api.Logger.Info(r.Context(), "accepting agent", slog.F("resource", resource), slog.F("agent", workspaceAgent)) + api.Logger.Info(ctx, "accepting agent", slog.F("resource", resource), slog.F("agent", workspaceAgent)) ticker := time.NewTicker(api.AgentConnectionUpdateFrequency) defer ticker.Stop() From e90a0c0c714792622a11985dbb5b287d705d1750 Mon Sep 17 00:00:00 2001 From: Mathias Fredriksson Date: Thu, 2 Jun 2022 15:38:51 +0300 Subject: [PATCH 4/6] fix: Use of r.Context() in workspaceAgentPTY --- coderd/workspaceagents.go | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/coderd/workspaceagents.go b/coderd/workspaceagents.go index b5e8ca1bcb4a2..6d14258c4217d 100644 --- a/coderd/workspaceagents.go +++ b/coderd/workspaceagents.go @@ -387,12 +387,12 @@ func (api *API) workspaceAgentPTY(rw http.ResponseWriter, r *http.Request) { }) return } - defer func() { - _ = conn.Close(websocket.StatusNormalClosure, "ended") - }() + // Accept text connections, because it's more developer friendly. - wsNetConn := websocket.NetConn(r.Context(), conn, websocket.MessageBinary) - agentConn, err := api.dialWorkspaceAgent(r, workspaceAgent.ID) + ctx, wsNetConn := websocketNetConn(r.Context(), conn, websocket.MessageBinary) + defer wsNetConn.Close() // Also closes conn. + + agentConn, err := api.dialWorkspaceAgent(ctx, r, workspaceAgent.ID) if err != nil { _ = conn.Close(websocket.StatusInternalError, httpapi.WebsocketCloseSprintf("dial workspace agent: %s", err)) return @@ -411,11 +411,13 @@ func (api *API) workspaceAgentPTY(rw http.ResponseWriter, r *http.Request) { _, _ = io.Copy(ptNetConn, wsNetConn) } -// dialWorkspaceAgent connects to a workspace agent by ID. -func (api *API) dialWorkspaceAgent(r *http.Request, agentID uuid.UUID) (*agent.Conn, error) { +// dialWorkspaceAgent connects to a workspace agent by ID. Only rely on +// r.Context() for cancellation if it's use is safe or r.Hijack() has +// not been performed. +func (api *API) dialWorkspaceAgent(ctx context.Context, r *http.Request, agentID uuid.UUID) (*agent.Conn, error) { client, server := provisionersdk.TransportPipe() go func() { - _ = peerbroker.ProxyListen(r.Context(), server, peerbroker.ProxyOptions{ + _ = peerbroker.ProxyListen(ctx, server, peerbroker.ProxyOptions{ ChannelID: agentID.String(), Logger: api.Logger.Named("peerbroker-proxy-dial"), Pubsub: api.Pubsub, @@ -425,7 +427,7 @@ func (api *API) dialWorkspaceAgent(r *http.Request, agentID uuid.UUID) (*agent.C }() peerClient := proto.NewDRPCPeerBrokerClient(provisionersdk.Conn(client)) - stream, err := peerClient.NegotiateConnection(r.Context()) + stream, err := peerClient.NegotiateConnection(ctx) if err != nil { return nil, xerrors.Errorf("negotiate: %w", err) } @@ -437,7 +439,7 @@ func (api *API) dialWorkspaceAgent(r *http.Request, agentID uuid.UUID) (*agent.C options.SettingEngine.SetICEProxyDialer(turnconn.ProxyDialer(func() (c net.Conn, err error) { clientPipe, serverPipe := net.Pipe() go func() { - <-r.Context().Done() + <-ctx.Done() _ = clientPipe.Close() _ = serverPipe.Close() }() From e0a2f8edf273cd26034d2e9fc838e9b02cfbf453 Mon Sep 17 00:00:00 2001 From: Mathias Fredriksson Date: Thu, 2 Jun 2022 16:31:36 +0300 Subject: [PATCH 5/6] chore: Fix PR comments --- coderd/workspaceagents.go | 1 - 1 file changed, 1 deletion(-) diff --git a/coderd/workspaceagents.go b/coderd/workspaceagents.go index 6d14258c4217d..6f7c792527d6c 100644 --- a/coderd/workspaceagents.go +++ b/coderd/workspaceagents.go @@ -388,7 +388,6 @@ func (api *API) workspaceAgentPTY(rw http.ResponseWriter, r *http.Request) { return } - // Accept text connections, because it's more developer friendly. ctx, wsNetConn := websocketNetConn(r.Context(), conn, websocket.MessageBinary) defer wsNetConn.Close() // Also closes conn. From 9e6727e665c3684cf8fb65ab373083c50a1ea51f Mon Sep 17 00:00:00 2001 From: Mathias Fredriksson Date: Thu, 2 Jun 2022 16:37:37 +0300 Subject: [PATCH 6/6] fix: wsNetConn cancel on any error --- coderd/workspaceagents.go | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/coderd/workspaceagents.go b/coderd/workspaceagents.go index 6f7c792527d6c..f26ebe92d8283 100644 --- a/coderd/workspaceagents.go +++ b/coderd/workspaceagents.go @@ -4,7 +4,6 @@ import ( "context" "database/sql" "encoding/json" - "errors" "fmt" "io" "net" @@ -521,7 +520,7 @@ func convertWorkspaceAgent(dbAgent database.WorkspaceAgent, agentUpdateFrequency } // wsNetConn wraps net.Conn created by websocket.NetConn(). Cancel func -// is called if io.EOF is encountered. +// is called if a read or write error is encountered. type wsNetConn struct { cancel context.CancelFunc net.Conn @@ -529,7 +528,7 @@ type wsNetConn struct { func (c *wsNetConn) Read(b []byte) (n int, err error) { n, err = c.Conn.Read(b) - if errors.Is(err, io.EOF) { + if err != nil { c.cancel() } return n, err @@ -537,7 +536,7 @@ func (c *wsNetConn) Read(b []byte) (n int, err error) { func (c *wsNetConn) Write(b []byte) (n int, err error) { n, err = c.Conn.Write(b) - if errors.Is(err, io.EOF) { + if err != nil { c.cancel() } return n, err @@ -549,8 +548,8 @@ func (c *wsNetConn) Close() error { } // websocketNetConn wraps websocket.NetConn and returns a context that -// is tied to the parent context and the lifetime of the conn. A io.EOF -// error during read or write will cancel the context, but not close the +// is tied to the parent context and the lifetime of the conn. Any error +// during read or write will cancel the context, but not close the // conn. Close should be called to release context resources. func websocketNetConn(ctx context.Context, conn *websocket.Conn, msgType websocket.MessageType) (context.Context, net.Conn) { ctx, cancel := context.WithCancel(ctx)