10000 feat: automatically use websockets if DERP upgrade is unavailable by kylecarbs · Pull Request #6381 · coder/coder · GitHub
[go: up one dir, main page]

Skip to content

feat: automatically use websockets if DERP upgrade is unavailable #6381

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Mar 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion coderd/coderd.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,9 @@ func New(options *Options) *API {
// replicas or instances of this middleware.
apiRateLimiter := httpmw.RateLimit(options.APIRateLimit, time.Minute)

derpHandler := derphttp.Handler(api.DERPServer)
derpHandler, api.derpCloseFunc = tailnet.WithWebsocketSupport(api.DERPServer, derpHandler)

r.Use(
httpmw.Recover(api.Logger),
tracing.StatusWriterMiddleware,
Expand Down Expand Up @@ -363,7 +366,7 @@ func New(options *Options) *API {
r.Route("/%40{user}/{workspace_and_agent}/apps/{workspaceapp}", apps)
r.Route("/@{user}/{workspace_and_agent}/apps/{workspaceapp}", apps)
r.Route("/derp", func(r chi.Router) {
r.Get("/", derphttp.Handler(api.DERPServer).ServeHTTP)
r.Get("/", derpHandler.ServeHTTP)
// This is used when UDP is blocked, and latency must be checked via HTTP(s).
r.Get("/latency-check", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
Expand Down Expand Up @@ -726,6 +729,7 @@ type API struct {

WebsocketWaitMutex sync.Mutex
WebsocketWaitGroup sync.WaitGroup
derpCloseFunc func()

metricsCache *metricscache.Cache
workspaceAgentCache *wsconncache.Cache
Expand All @@ -739,6 +743,7 @@ type API struct {
// Close waits for all WebSocket connections to drain before returning.
func (api *API) Close() error {
api.cancel()
api.derpCloseFunc()

api.WebsocketWaitMutex.Lock()
api.WebsocketWaitGroup.Wait()
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ replace github.com/tcnksm/go-httpstat => github.com/kylecarbs/go-httpstat v0.0.0

// There are a few minor changes we make to Tailscale that we're slowly upstreaming. Compare here:
// https://github.com/tailscale/tailscale/compare/main...coder:tailscale:main
replace tailscale.com => github.com/coder/tailscale v1.1.1-0.20230210022917-446fc10e755a
replace tailscale.com => github.com/coder/tailscale v1.1.1-0.20230301203426-fb16ae7c5bba

// Switch to our fork that imports fixes from http://github.com/tailscale/ssh.
// See: https://github.com/coder/coder/issues/3371
Expand Down
28 changes: 28 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,34 @@ github.com/coder/ssh v0.0.0-20220811105153-fcea99919338 h1:tN5GKFT68YLVzJoA8AHui
github.com/coder/ssh v0.0.0-20220811105153-fcea99919338/go.mod h1:ZSS+CUoKHDrqVakTfTWUlKSr9MtMFkC4UvtQKD7O914=
github.com/coder/tailscale v1.1.1-0.20230210022917-446fc10e755a h1:fTXoK+Yikz8Rl0v0nHxO+lTV0y8Q7wdmGFq1CqLgznE=
github.com/coder/tailscale v1.1.1-0.20230210022917-446fc10e755a/go.mod h1:jpg+77g19FpXL43U1VoIqoSg1K/Vh5CVxycGldQ8KhA=
github.com/coder/tailscale v1.1.1-0.20230228183907-71829ca8456b h1:i8R1RwDqLihavxsaTovKCax45AJLy2dtXI0PcLm+BnM=
github.com/coder/tailscale v1.1.1-0.20230228183907-71829ca8456b/go.mod h1:jpg+77g19FpXL43U1VoIqoSg1K/Vh5CVxycGldQ8KhA=
github.com/coder/tailscale v1.1.1-0.20230228184228-8e7815556b85 h1:tg89NZBDe91wWTeII4jFThHdB6USE3mBNXzK0/CWODM=
github.com/coder/tailscale v1.1.1-0.20230228184228-8e7815556b85/go.mod h1:jpg+77g19FpXL43U1VoIqoSg1K/Vh5CVxycGldQ8KhA=
github.com/coder/tailscale v1.1.1-0.20230228184330-c2cd7d4f5a70 h1:gGevbQLqz4fWk5HytFodNe/Bl8rqgHb2+YsRoaZPf1g=
github.com/coder/tailscale v1.1.1-0.20230228184330-c2cd7d4f5a70/go.mod h1:jpg+77g19FpXL43U1VoIqoSg1K/Vh5CVxycGldQ8KhA=
github.com/coder/tailscale v1.1.1-0.20230228184508-acb075f559ce h1:oAYRHRM8/rPrQhcPiXBkLw/YU/NyPhhN+HypRJWSq2k=
github.com/coder/tailscale v1.1.1-0.20230228184508-acb075f559ce/go.mod h1:jpg+77g19FpXL43U1VoIqoSg1K/Vh5CVxycGldQ8KhA=
github.com/coder/tailscale v1.1.1-0.20230228184618-0e3e4965c416 h1:25uoFv5YNvve9r0BEGJAJHVWulyPmPPvXp6S3b0085o=
github.com/coder/tailscale v1.1.1-0.20230228184618-0e3e4965c416/go.mod h1:jpg+77g19FpXL43U1VoIqoSg1K/Vh5CVxycGldQ8KhA=
github.com/coder/tailscale v1.1.1-0.20230228185337-873afd546fb2 h1:yrKQPYAadmtrj5J33Fs/Ldx3FgXCfd0gUbcFqjpsq+I=
github.com/coder/tailscale v1.1.1-0.20230228185337-873afd546fb2/go.mod h1:jpg+77g19FpXL43U1VoIqoSg1K/Vh5CVxycGldQ8KhA=
github.com/coder/tailscale v1.1.1-0.20230228185607-b6042b55609a h1:aqp8CUtVBcCLDI369ORzbVt/pfOt0mTZBxIAejgZkgY=
github.com/coder/tailscale v1.1.1-0.20230228185607-b6042b55609a/go.mod h1:jpg+77g19FpXL43U1VoIqoSg1K/Vh5CVxycGldQ8KhA=
github.com/coder/tailscale v1.1.1-0.20230228194335-31045cf53c85 h1:MHJDOehTwolaeT8FQrBQ5pNEdWXO+8pWTy5ODIYOZUU=
github.com/coder/tailscale v1.1.1-0.20230228194335-31045cf53c85/go.mod h1:jpg+77g19FpXL43U1VoIqoSg1K/Vh5CVxycGldQ8KhA=
github.com/coder/tailscale v1.1.1-0.20230228194714-66eac1a43e5c h1:WylIWZ+8UpiW2NPTu6z2sgFHwqo9/XmT/QTNWT47qic=
github.com/coder/tailscale v1.1.1-0.20230228194714-66eac1a43e5c/go.mod h1:jpg+77g19FpXL43U1VoIqoSg1K/Vh5CVxycGldQ8KhA=
github.com/coder/tailscale v1.1.1-0.20230228200853-ddedcafad3a1 h1:XgOpzrEiDR4GzoKE4dhqopZAg9MrG1cXPMc9/PXA6kk=
github.com/coder/tailscale v1.1.1-0.20230228200853-ddedcafad3a1/go.mod h1:jpg+77g19FpXL43U1VoIqoSg1K/Vh5CVxycGldQ8KhA=
github.com/coder/tailscale v1.1.1-0.20230228204054-70dbfd7be018 h1:pv80tzqexcjrP/4CxFpalrk6bS2XJ2xOGA00u1WUy+g=
github.com/coder/tailscale v1.1.1-0.20230228204054-70dbfd7be018/go.mod h1:jpg+77g19FpXL43U1VoIqoSg1K/Vh5CVxycGldQ8KhA=
github.com/coder/tailscale v1.1.1-0.20230228215233-ba13d5ceee2a h1:eXE/5hMkcHfa3dcJJ4WkseSbJNU4thMfC8cUMIAgi2s=
github.com/coder/tailscale v1.1.1-0.20230228215233-ba13d5ceee2a/go.mod h1:jpg+77g19FpXL43U1VoIqoSg1K/Vh5CVxycGldQ8KhA=
github.com/coder/tailscale v1.1.1-0.20230228220447-d27e5d3056e9 h1:bFcFXLUUi+cwgOqrjbXN+XmI7QvB1up/UZoNF1+9nuM=
github.com/coder/tailscale v1.1.1-0.20230228220447-d27e5d3056e9/go.mod h1:jpg+77g19FpXL43U1VoIqoSg1K/Vh5CVxycGldQ8KhA=
github.com/coder/tailscale v1.1.1-0.20230301203426-fb16ae7c5bba h1:JOD5pqNtiz9lkSX74PY2BJOyNqsBmvGUjFGIuECtG+o=
github.com/coder/tailscale v1.1.1-0.20230301203426-fb16ae7c5bba/go.mod h1:jpg+77g19FpXL43U1VoIqoSg1K/Vh5CVxycGldQ8KhA=
github.com/coder/terraform-provider-coder v0.6.14 h1:NsJ1mo0MN1x/VyNLYmsgPUYn2JgzdVNZBqnj9OSIDgY=
github.com/coder/terraform-provider-coder v0.6.14/go.mod h1:UIfU3bYNeSzJJvHyJ30tEKjD6Z9utloI+HUM/7n94CY=
github.com/containerd/aufs v0.0.0-20200908144142-dab0cbea06f4/go.mod h1:nukgQABAEopAHvB6j7cnP5zJ+/3aVcE7hCYqvIwAHyE=
Expand Down
68 changes: 42 additions & 26 deletions tailnet/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,19 +192,20 @@ func NewConn(options *Options) (conn *Conn, err error) {
wireguardEngine.SetFilter(filter.New(netMap.PacketFilter, localIPs, logIPs, nil, Logger(options.Logger.Named("packet-filter"))))
dialContext, dialCancel := context.WithCancel(context.Background())
server := &Conn{
blockEndpoints: options.BlockEndpoints,
dialContext: dialContext,
dialCancel: dialCancel,
closed: make(chan struct{}),
logger: options.Logger,
magicConn: magicConn,
dialer: dialer,
listeners: map[listenKey]*listener{},
peerMap: map[tailcfg.NodeID]*tailcfg.Node{},
tunDevice: tunDevice,
netMap: netMap,
netStack: netStack,
wireguardMonitor: wireguardMonitor,
blockEndpoints: options.BlockEndpoints,
dialContext: dialContext,
dialCancel: dialCancel,
closed: make(chan struct{}),
logger: options.Logger,
magicConn: magicConn,
dialer: dialer,
listeners: map[listenKey]*listener{},
peerMap: map[tailcfg.NodeID]*tailcfg.Node{},
lastDERPForcedWebsockets: map[int]string{},
tunDevice: tunDevice,
netMap: netMap,
netStack: netStack,
wireguardMonitor: wireguardMonitor,
wireguardRouter: &router.Config{
LocalAddrs: netMap.Addresses,
},
Expand Down Expand Up @@ -247,6 +248,17 @@ func NewConn(options *Options) (conn *Conn, err error) {
server.lastMutex.Unlock()
server.sendNode()
})
magicConn.SetDERPForcedWebsocketCallback(func(region int, reason string) {
server.logger.Debug(context.Background(), "derp forced websocket", slog.F("region", region), slog.F("reason", reason))
server.lastMutex.Lock()
if server.lastDERPForcedWebsockets[region] == reason {
server.lastMutex.Unlock()
return
}
server.lastDERPForcedWebsockets[region] = reason
server.lastMutex.Unlock()
server.sendNode()
})
netStack.ForwardTCPIn = server.forwardTCP

err = netStack.Start(nil)
Expand Down Expand Up @@ -299,10 +311,11 @@ type Conn struct {
nodeChanged bool
// It's only possible to store these values via status functions,
// so the values must be stored for retrieval later on.
lastStatus time.Time
lastEndpoints []tailcfg.Endpoint
lastNetInfo *tailcfg.NetInfo
nodeCallback func(node *Node)
lastStatus time.Time
lastEndpoints []tailcfg.Endpoint
lastDERPForcedWebsockets map[int]string
lastNetInfo *tailcfg.NetInfo
nodeCallback func(node *Node)

trafficStats *connstats.Statistics
}
Expand Down Expand Up @@ -632,21 +645,24 @@ func (c *Conn) selfNode() *Node {
}
var preferredDERP int
var derpLatency map[string]float64
var derpForcedWebsocket map[int]string
if c.lastNetInfo != nil {
preferredDERP = c.lastNetInfo.PreferredDERP
derpLatency = c.lastNetInfo.DERPLatency
derpForcedWebsocket = c.lastDERPForcedWebsockets
}

node := &Node{
ID: c.netMap.SelfNode.ID,
AsOf: database.Now(),
Key: c.netMap.SelfNode.Key,
Addresses: c.netMap.SelfNode.Addresses,
AllowedIPs: c.netMap.SelfNode.AllowedIPs,
DiscoKey: c.magicConn.DiscoPublicKey(),
Endpoints: endpoints,
PreferredDERP: preferredDERP,
DERPLatency: derpLatency,
ID: c.netMap.SelfNode.ID,
AsOf: database.Now(),
Key: c.netMap.SelfNode.Key,
Addresses: c.netMap.SelfNode.Addresses,
AllowedIPs: c.netMap.SelfNode.AllowedIPs,
DiscoKey: c.magicConn.DiscoPublicKey(),
Endpoints: endpoints,
PreferredDERP: preferredDERP,
DERPLatency: derpLatency,
DERPForcedWebsocket: derpForcedWebsocket,
}
if c.blockEndpoints {
node.Endpoints = nil
Expand Down
83 changes: 82 additions & 1 deletion tailnet/conn_test.go
Original file line number Diff line number Diff F438 line change
Expand Up @@ -13,6 +13,7 @@ import (
"cdr.dev/slog/sloggers/slogtest"
"github.com/coder/coder/tailnet"
"github.com/coder/coder/tailnet/tailnettest"
"github.com/coder/coder/testutil"
)

func TestMain(m *testing.M) {
Expand Down Expand Up @@ -63,7 +64,7 @@ func TestTailnet(t *testing.T) {
assert.NoError(t, err)
})
require.True(t, w2.AwaitReachable(context.Background(), w1IP))
conn := make(chan struct{})
conn := make(chan struct{}, 1)
go func() {
listener, err := w1.Listen("tcp", ":35565")
assert.NoError(t, err)
Expand All @@ -81,6 +82,86 @@ func TestTailnet(t *testing.T) {
_ = nc.Close()
<-conn

nodes := make(chan *tailnet.Node, 1)
w2.SetNodeCallback(func(node *tailnet.Node) {
select {
case nodes <- node:
default:
}
})
node := <-nodes
// Ensure this connected over DERP!
require.Len(t, node.DERPForcedWebsocket, 0)

w1.Close()
w2.Close()
})

t.Run("ForcesWebSockets", func(t *testing.T) {
t.Parallel()
ctx, cancelFunc := testutil.Context(t)
defer cancelFunc()

w1IP := tailnet.IP()
derpMap := tailnettest.RunDERPOnlyWebSockets(t)
w1, err := tailnet.NewConn(&tailnet.Options{
Addresses: []netip.Prefix{netip.PrefixFrom(w1IP, 128)},
Logger: logger.Named("w1"),
DERPMap: derpMap,
BlockEndpoints: true,
})
require.NoError(t, err)

w2, err := tailnet.NewConn(&tailnet.Options{
Addresses: []netip.Prefix{netip.PrefixFrom(tailnet.IP(), 128)},
Logger: logger.Named("w2"),
DERPMap: derpMap,
BlockEndpoints: true,
})
require.NoError(t, err)
t.Cleanup(func() {
_ = w1.Close()
_ = w2.Close()
})
w1.SetNodeCallback(func(node *tailnet.Node) {
err := w2.UpdateNodes([]*tailnet.Node{node}, false)
assert.NoError(t, err)
})
w2.SetNodeCallback(func(node *tailnet.Node) {
err := w1.UpdateNodes([]*tailnet.Node{node}, false)
assert.NoError(t, err)
})
require.True(t, w2.AwaitReachable(ctx, w1IP))
conn := make(chan struct{}, 1)
go func() {
listener, err := w1.Listen("tcp", ":35565")
assert.NoError(t, err)
defer listener.Close()
nc, err := listener.Accept()
if !assert.NoError(t, err) {
return
}
_ = nc.Close()
conn <- struct{}{}
}()

nc, err := w2.DialContextTCP(ctx, netip.AddrPortFrom(w1IP, 35565))
require.NoError(t, err)
_ = nc.Close()
<-conn

nodes := make(chan *tailnet.Node, 1)
w2.SetNodeCallback(func(node *tailnet.Node) {
select {
case nodes <- node:
default:
}
})
node := <-nodes
require.Len(t, node.DERPForcedWebsocket, 1)
// Ensure the reason is valid!
require.Equal(t, "GET failed with status code 400: Invalid \"Upgrade\" he 10000 ader: DERP", node.DERPForcedWebsocket[derpMap.RegionIDs()[0]])

w1.Close()
w2.Close()
})
Expand Down
5 changes: 5 additions & 0 deletions tailnet/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ type Node struct {
PreferredDERP int `json:"preferred_derp"`
// DERPLatency is the latency in seconds to each DERP server.
DERPLatency map[string]float64 `json:"derp_latency"`
// DERPForcedWebsocket contains a mapping of DERP regions to
// error messages that caused the connection to be forced to
// use WebSockets. We don't use WebSockets by default because
// they are less performant.
DERPForcedWebsocket map[int]string `json:"derp_forced_websockets"`
// Addresses are the IP address ranges this connection exposes.
Addresses []netip.Prefix `json:"addresses"`
// AllowedIPs specify what addresses can dial the connection.
Expand Down
72 changes: 72 additions & 0 deletions tailnet/derp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package tailnet

import (
"bufio"
"context"
"log"
"net/http"
"strings"
"sync"

"nhooyr.io/websocket"
"tailscale.com/derp"
"tailscale.com/net/wsconn"
)

// WithWebsocketSupport returns an http.Handler that upgrades
// connections to the "derp" subprotocol to WebSockets and
// passes them to the DERP server.
// Taken from: https://github.com/tailscale/tailscale/blob/e3211ff88ba85435f70984cf67d9b353f3d650d8/cmd/derper/websocket.go#L21
func WithWebsocketSupport(s *derp.Server, base http.Handler) (http.Handler, func()) {
var mu sync.Mutex
var waitGroup sync.WaitGroup
ctx, cancelFunc := context.WithCancel(context.Background())

return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
up := strings.ToLower(r.Header.Get("Upgrade"))

// Very early versions of Tailscale set "Upgrade: WebSocket" but didn't actually
// speak WebSockets (they still assumed DERP's binary framing). So to distinguish
// clients that actually want WebSockets, look for an explicit "derp" subprotocol.
if up != "websocket" || !strings.Contains(r.Header.Get("Sec-Websocket-Protocol"), "derp") {
base.ServeHTTP(w, r)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we log here to better understand why it may not be working? (E.g. future bugs.)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so because this could be spammy.

return
}

mu.Lock()
if ctx.Err() != nil {
mu.Unlock()
return
}
waitGroup.Add(1)
mu.Unlock()
defer waitGroup.Done()
c, err := websocket.Accept(w, r, &websocket.AcceptOptions{
Subprotocols: []string{"derp"},
OriginPatterns: []string{"*"},
// Disable compression because we transmit WireGuard messages that
// are not compressible.
// Additionally, Safari has a broken implementation of compression
// (see https://github.com/nhooyr/websocket/issues/218) that makes
// enabling it actively harmful.
CompressionMode: websocket.CompressionDisabled,
})
if err != nil {
log.Printf("websocket.Accept: %v", err)
return
}
defer c.Close(websocket.StatusInternalError, "closing")
if c.Subprotocol() != "derp" {
c.Close(websocket.StatusPolicyViolation, "client must speak the derp subprotocol")
return
}
wc := wsconn.NetConn(ctx, c, websocket.MessageBinary)
brw := bufio.NewReadWriter(bufio.NewReader(wc), bufio.NewWriter(wc))
s.Accept(ctx, wc, brw, r.RemoteAddr)
}), func() {
cancelFunc()
mu.Lock()
waitGroup.Wait()
mu.Unlock()
}
}
Loading
0