8000 feat: Add speedtest command for tailnet · coder/coder@f1ff169 · GitHub
[go: up one dir, main page]

Skip to content

Commit f1ff169

Browse files
committed
feat: Add speedtest command for tailnet
1 parent 1a5d3ea commit f1ff169

File tree

8 files changed

+205
-5
lines changed

8 files changed

+205
-5
lines changed

agent/agent.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"go.uber.org/atomic"
3030
gossh "golang.org/x/crypto/ssh"
3131
"golang.org/x/xerrors"
32+
"tailscale.com/net/speedtest"
3233
"tailscale.com/tailcfg"
3334

3435
"cdr.dev/slog"
@@ -58,6 +59,7 @@ var (
5859
tailnetIP = netip.MustParseAddr("fd7a:115c:a1e0:49d6:b259:b7ac:b1b2:48f4")
5960
tailnetSSHPort = 1
6061
tailnetReconnectingPTYPort = 2
62+
tailnetSpeedtestPort = 3
6163
)
6264

6365
type Options struct {
@@ -256,6 +258,24 @@ func (a *agent) runTailnet(ctx context.Context, derpMap *tailcfg.DERPMap) {
256258
go a.handleReconnectingPTY(ctx, msg, conn)
257259
}
258260
}()
261+
speedtestListener, err := a.network.Listen("tcp", ":"+strconv.Itoa(tailnetSpeedtestPort))
262+
if err != nil {
263+
a.logger.Critical(ctx, "listen for speedtest", slog.Error(err))
264+
return
265+
}
266+
a.connCloseWait.Add(1)
267+
go func() {
268+
defer a.connCloseWait.Done()
269+
for {
270+
err := speedtest.Serve(speedtestListener)
271+
if err == nil {
272+
break
273+
}
274+
if err != nil {
275+
a.logger.Debug(ctx, "speedtest exited", slog.Error(err))
276+
}
277+
}
278+
}()
259279
}
260280

261281
// runCoordinator listens for nodes and updates the self-node as it changes.

agent/agent_test.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"time"
2020

2121
"golang.org/x/xerrors"
22+
"tailscale.com/net/speedtest"
2223
"tailscale.com/tailcfg"
2324

2425
scp "github.com/bramvdbogaerde/go-scp"
@@ -547,6 +548,21 @@ func TestAgent(t *testing.T) {
547548
return err == nil
548549
}, testutil.WaitMedium, testutil.IntervalFast)
549550
})
551+
552+
t.Run("Speedtest", func(t *testing.T) {
553+
t.Parallel()
554+
if testing.Short() {
555+
t.Skip("The minimum duration for a speedtest is hardcoded in Tailscale to 5s!")
556+
}
557+
derpMap := tailnettest.RunDERPAndSTUN(t)
558+
conn, _ := setupAgent(t, agent.Metadata{
559+
DERPMap: derpMap,
560+
}, 0)
561+
defer conn.Close()
562+
res, err := conn.Speedtest(speedtest.Upload, speedtest.MinDuration)
563+
require.NoError(t, err)
564+
t.Logf("%.2f MBits/s", res[len(res)-1].MBitsPerSecond())
565+
})
550566
}
551567

552568
func setupSSHCommand(t *testing.T, beforeArgs []string, afterArgs []string) *exec.Cmd {

agent/conn.go

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"golang.org/x/crypto/ssh"
1717
"golang.org/x/xerrors"
1818
"tailscale.com/ipn/ipnstate"
19+
"tailscale.com/net/speedtest"
1920
"tailscale.com/tailcfg"
2021

2122
"github.com/coder/coder/peer"
@@ -39,6 +40,7 @@ type Conn interface {
3940
CloseWithError(err error) error
4041
ReconnectingPTY(id string, height, width uint16, command string) (net.Conn, error)
4142
SSH() (net.Conn, error)
43+
Speedtest(direction speedtest.Direction, duration time.Duration) ([]speedtest.Result, error)
4244
SSHClient() (*ssh.Client, error)
4345
DialContext(ctx context.Context, network string, addr string) (net.Conn, error)
4446
}
@@ -77,6 +79,10 @@ func (c *WebRTCConn) SSH() (net.Conn, error) {
7779
return channel.NetConn(), nil
7880
}
7981

82+
func (*WebRTCConn) Speedtest(_ speedtest.Direction, _ time.Duration) ([]speedtest.Result, error) {
83+
return nil, xerrors.New("not implemented")
84+
}
85+
8086
// SSHClient calls SSH to create a client that uses a weak cipher
8187
// for high throughput.
8288
func (c *WebRTCConn) SSHClient() (*ssh.Client, error) {
@@ -227,6 +233,47 @@ func (c *TailnetConn) SSHClient() (*ssh.Client, error) {
227233
return ssh.NewClient(sshConn, channels, requests), nil
228234
}
229235

236+
func (c *TailnetConn) Speedtest(direction speedtest.Direction, duration time.Duration) ([]speedtest.Result, error) {
237+
listener, err := net.Listen("tcp", "127.0.0.1:0")
238+
if err != nil {
239+
return nil, err
240+
}
241+
defer listener.Close()
242+
tcpAddr, ok := listener.Addr().(*net.TCPAddr)
243+
if !ok {
244+
return nil, xerrors.New("Address wasn't TCP!")
245+
}
246+
errs := make(chan error, 1)
247+
closed := make(chan struct{})
248+
go func() {
249+
defer close(closed)
250+
ctx, cancelFunc := context.WithCancel(context.Background())
251+
defer cancelFunc()
252+
for {
253+
conn, err := listener.Accept()
254+
if err != nil {
255+
return
256+
}
257+
speedConn, err := c.DialContextTCP(ctx, netip.AddrPortFrom(tailnetIP, uint16(tailnetSpeedtestPort)))
258+
if err != nil {
259+
errs <- err
260+
return
261+
}
262+
go Bicopy(ctx, conn, speedConn)
263+
}
264+
}()
265+
results, err := speedtest.RunClient(direction, duration, fmt.Sprintf("127.0.0.1:%d", tcpAddr.Port))
266+
if err != nil {
267+
return nil, xerrors.Errorf("run speedtest: %w", err)
268+
}
269+
_ = listener.Close()
270+
select {
271+
case <-closed:
272+
case err = <-errs:
273+
}
274+
return results, err
275+
}
276+
230277
func (c *TailnetConn) DialContext(ctx context.Context, network string, addr string) (net.Conn, error) {
231278
_, rawPort, _ := net.SplitHostPort(addr)
232279
port, _ := strconv.Atoi(rawPort)

cli/root.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ func Core() []*cobra.Command {
7878
schedules(),
7979
show(),
8080
ssh(),
81+
speedtest(),
8182
start(),
8283
state(),
8384
stop(),

cli/speedtest.go

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
package cli
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"time"
7+
8+
"cdr.dev/slog"
9+
"github.com/coder/coder/cli/cliui"
10+
"github.com/coder/coder/codersdk"
11+
"github.com/jedib0t/go-pretty/v6/table"
12+
"github.com/spf13/cobra"
13+
"golang.org/x/xerrors"
14+
tsspeedtest "tailscale.com/net/speedtest"
15+
)
16+
17+
func speedtest() *cobra.Command {
18+
return &cobra.Command{
19+
Annotations: workspaceCommand,
20+
Use: "speedtest <workspace>",
21+
Short: "Run a speed test uploading from your machine to the workspace.",
22+
RunE: func(cmd *cobra.Command, args []string) error {
23+
ctx, cancel := context.WithCancel(cmd.Context())
24+
defer cancel()
25+
26+
client, err := CreateClient(cmd)
27+
if err != nil {
28+
return xerrors.Errorf("create codersdk client: %w", err)
29+
}
30+
31+
workspace, workspaceAgent, err := getWorkspaceAndAgent(ctx, cmd, client, codersdk.Me, args[0], false)
32+
if err != nil {
33+
return err
34+
}
35+
36+
err = cliui.Agent(ctx, cmd.ErrOrStderr(), cliui.AgentOptions{
37+
WorkspaceName: workspace.Name,
38+
Fetch: func(ctx context.Context) (codersdk.WorkspaceAgent, error) {
39+
return client.WorkspaceAgent(ctx, workspaceAgent.ID)
40+
},
41+
})
42+
if err != nil {
43+
return xerrors.Errorf("await agent: %w", err)
44+
}
45+
conn, err := client.DialWorkspaceAgentTailnet(ctx, slog.Logger{}, workspaceAgent.ID)
46+
if err != nil {
47+
return err
48+
}
49+
defer conn.Close()
50+
results, err := conn.Speedtest(tsspeedtest.Upload, 5*time.Second)
51+
if err != nil {
52+
return err
53+
}
54+
tableWriter := cliui.Table()
55+
tableWriter.AppendHeader(table.Row{"Interval", "Transfer", "Bandwidth"})
56+
for _, r := range results {
57+
if r.Total {
58+
tableWriter.AppendSeparator()
59+
}
60+
tableWriter.AppendRow(table.Row{
61+
fmt.Sprintf("%.2f-%.2f sec", r.IntervalStart.Seconds(), r.IntervalEnd.Seconds()),
62+
fmt.Sprintf("%.4f MBits", r.MegaBits()),
63+
fmt.Sprintf("%.4f Mbits/sec", r.MBitsPerSecond()),
64+
})
65+
}
66+
_, err = fmt.Fprintln(cmd.OutOrStdout(), tableWriter.Render())
67+
return err
68+
},
69+
}
70+
}

cli/speedtest_test.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package cli_test
2+
3+
import (
4+
"context"
5+
"testing"
6+
7+
"cdr.dev/slog/sloggers/slogtest"
8+
"github.com/coder/coder/agent"
9+
"github.com/coder/coder/cli/clitest"
10+
"github.com/coder/coder/coderd/coderdtest"
11+
"github.com/coder/coder/codersdk"
12+
"github.com/coder/coder/pty/ptytest"
13+
"github.com/coder/coder/testutil"
14+
"github.com/stretchr/testify/assert"
15+
)
16+
17+
func TestSpeedtest(t *testing.T) {
18+
t.Parallel()
19+
if testing.Short() {
20+
t.Skip("This test takes a minimum of 5ms per a hardcoded value in Tailscale!")
21+
}
22+
client, workspace, agentToken := setupWorkspaceForAgent(t)
23+
agentClient := codersdk.New(client.URL)
24+
agentClient.SessionToken = agentToken
25+
agentCloser := agent.New(agent.Options{
26+
FetchMetadata: agentClient.WorkspaceAgentMetadata,
27+
WebRTCDialer: agentClient.ListenWorkspaceAgent,
28+
CoordinatorDialer: agentClient.ListenWorkspaceAgentTailnet,
29+
Logger: slogtest.Make(t, nil).Named("agent"),
30+
})
31+
defer agentCloser.Close()
32+
coderdtest.AwaitWorkspaceAgents(t, client, workspace.LatestBuild.ID)
33+
34+
cmd, root := clitest.New(t, "speedtest", workspace.Name)
35+
clitest.SetupConfig(t, client, root)
36+
pty := ptytest.New(t)
37+
cmd.SetOut(pty.Output())
38+
39+
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong)
40+
defer cancel()
41+
cmdDone := tGo(t, func() {
42+
err := cmd.ExecuteContext(ctx)
43+
assert.NoError(t, err)
44+
})
45+
<-cmdDone
46+
}

cli/ssh_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ import (
3131
"github.com/coder/coder/testutil"
3232
)
3333

34-
func setupWorkspaceForSSH(t *testing.T) (*codersdk.Client, codersdk.Workspace, string) {
34+
func setupWorkspaceForAgent(t *testing.T) (*codersdk.Client, codersdk.Workspace, string) {
3535
t.Helper()
3636
client := coderdtest.New(t, &coderdtest.Options{IncludeProvisionerDaemon: true})
3737
user := coderdtest.CreateFirstUser(t, client)
@@ -69,7 +69,7 @@ func TestSSH(t *testing.T) {
6969
t.Run("ImmediateExit", func(t *testing.T) {
7070
t.Parallel()
7171

72-
client, workspace, agentToken := setupWorkspaceForSSH(t)
72+
client, workspace, agentToken := setupWorkspaceForAgent(t)
7373
cmd, root := clitest.New(t, "ssh", workspace.Name)
7474
clitest.SetupConfig(t, client, root)
7575
pty := ptytest.New(t)
@@ -104,7 +104,7 @@ func TestSSH(t *testing.T) {
104104
})
105105
t.Run("Stdio", func(t *testing.T) {
106106
t.Parallel()
107-
client, workspace, agentToken := setupWorkspaceForSSH(t)
107+
client, workspace, agentToken := setupWorkspaceForAgent(t)
108108
_, _ = tGoContext(t, func(ctx context.Context) {
109109
// Run this async so the SSH command has to wait for
110110
// the build and agent to connect!
@@ -175,7 +175,7 @@ func TestSSH(t *testing.T) {
175175

176176
t.Parallel()
177177

178-
client, workspace, agentToken := setupWorkspaceForSSH(t)
178+
client, workspace, agentToken := setupWorkspaceForAgent(t)
179179

180180
agentClient := codersdk.New(client.URL)
181181
agentClient.SessionToken = agentToken

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ require (
157157
k8s.io/utils v0.0.0-20220210201930-3a6ce19ff2f9
158158
nhooyr.io/websocket v1.8.7
159159
storj.io/drpc v0.0.33-0.20220622181519-9206537a4db7
160-
tailscale.com v1.26.2
160+
tailscale.com v1.30.0
161161
)
162162

163163
require (

0 commit comments

Comments
 (0)
0