8000 chore(vpn): send ping results over tunnel by ethanndickson · Pull Request #18200 · coder/coder · GitHub
[go: up one dir, main page]

Skip to content

chore(vpn): send ping results over tunnel #18200

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Jun 6, 2025
Merged
Prev Previous commit
Next Next commit
actually fixed with extra tests
  • Loading branch information
ethanndickson committed Jun 5, 2025
commit 6b5ea629603abbb745a58b3506cb2f30854a9e7b
88 changes: 51 additions & 37 deletions vpn/tunnel.go
Original file line number Diff line number Diff line change
Expand Up @@ -619,47 +619,61 @@ func (u *updater) recordLatencies() {
for _, agent := range u.agents {
agentsIDsToPing = append(agentsIDsToPing, agent.ID)
}
conn := u.conn
u.mu.Unlock()

for _, agentID := range agentsIDsToPing {
go func() {
pingCtx, cancelFunc := context.WithTimeout(u.ctx, 5*time.Second)
defer cancelFunc()
pingDur, didP2p, pingResult, err := u.conn.Ping(pingCtx, agentID)
if err != nil {
u.logger.Warn(u.ctx, "failed to ping agent", slog.F("agent_id", agentID), slog.Error(err))
return
}
if conn == nil {
u.logger.Debug(u.ctx, "skipping pings as tunnel is not connected")
return
}

u.mu.Lock()
defer u.mu.Unlock()
if u.conn == nil {
u.logger.Debug(u.ctx, "ignoring ping result as connection is closed", slog.F("agent_id", agentID))
return
}
node := u.conn.Node()
derpMap := u.conn.DERPMap()
derpLatencies := tailnet.ExtractDERPLatency(node, derpMap)
preferredDerp := tailnet.ExtractPreferredDERPName(pingResult, node, derpMap)
var preferredDerpLatency *time.Duration
if derpLatency, ok := derpLatencies[preferredDerp]; ok {
preferredDerpLatency = &derpLatency
} else {
u.logger.Debug(u.ctx, "preferred DERP not found in DERP latency map", slog.F("preferred_derp", preferredDerp))
}
if agent, ok := u.agents[agentID]; ok {
agent.lastPing = &lastPing{
pingDur: pingDur,
didP2p: didP2p,
preferredDerp: preferredDerp,
preferredDerpLatency: preferredDerpLatency,
go func() {
// We need a waitgroup to cancel the context after all pings are done.
var wg sync.WaitGroup
pingCtx, cancelFunc := context.WithTimeout(u.ctx, 5*time.Second)
defer cancelFunc()
for _, agentID := range agentsIDsToPing {
wg.Add(1)
go func() {
defer wg.Done()

pingDur, didP2p, pingResult, err := conn.Ping(pingCtx, agentID)
if err != nil {
u.logger.Warn(u.ctx, "failed to ping agent", slog.F("agent_id", agentID), slog.Error(err))
return
}
u.agents[agentID] = agent
} else {
u.logger.Debug(u.ctx, "ignoring ping result for unknown agent", slog.F("agent_id", agentID))
}
}()
}

// We fetch the Node and DERPMap after each ping, as it may have
// changed.
node := conn.Node()
derpMap := conn.DERPMap()
derpLatencies := tailnet.ExtractDERPLatency(node, derpMap)
preferredDerp := tailnet.ExtractPreferredDERPName(pingResult, node, derpMap)
var preferredDerpLatency *time.Duration
if derpLatency, ok := derpLatencies[preferredDerp]; ok {
preferredDerpLatency = &derpLatency
} else {
u.logger.Debug(u.ctx, "preferred DERP not found in DERP latency map", slog.F("preferred_derp", preferredDerp))
}

// Write back results
u.mu.Lock()
defer u.mu.Unlock()
if agent, ok := u.agents[agentID]; ok {
agent.lastPing = &lastPing{
pingDur: pingDur,
didP2p: didP2p,
preferredDerp: preferredDerp,
preferredDerpLatency: preferredDerpLatency,
}
u.agents[agentID] = agent
} else {
u.logger.Debug(u.ctx, "ignoring ping result for unknown agent", slog.F("agent_id", agentID))
}
}()
}
wg.Wait()
}()
}

// processSnapshotUpdate handles the logic when a full state update is received.
Expand Down
206 changes: 198 additions & 8 deletions vpn/tunnel_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,17 @@ func newFakeConn(state tailnet.WorkspaceUpdate, hsTime time.Time) *fakeConn {
}
}

func (f *fakeConn) withManualPings() *fakeConn {
f.returnPing = make(chan struct{})
return f
}

type fakeConn struct {
state tailnet.WorkspaceUpdate
hsTime time.Time
closed chan struct{}
doClose sync.Once
state tailnet.WorkspaceUpdate
returnPing chan struct{}
hsTime time.Time
closed chan struct{}
doClose sync.Once
}

func (*fakeConn) DERPMap() *tailcfg.DERPMap {
Expand All @@ -90,10 +96,22 @@ func (*fakeConn) Node() *tailnet.Node {

var _ Conn = (*fakeConn)(nil)

func (*fakeConn) Ping(ctx context.Context, agentID uuid.UUID) (time.Duration, bool, *ipnstate.PingResult, error) {
return time.Millisecond * 100, true, &ipnstate.PingResult{
DERPRegionID: 999,
}, nil
func (f *fakeConn) Ping(ctx context.Context, agentID uuid.UUID) (time.Duration, bool, *ipnstate.PingResult, error) {
if f.returnPing == nil {
return time.Millisecond * 100, true, &ipnstate.PingResult{
DERPRegionID: 999,
}, nil
}

select {
case <-ctx.Done():
return 0, false, nil, ctx.Err()
case <-f.returnPing:
return time.Millisecond * 100, true, &ipnstate.PingResult{
DERPRegionID: 999,
}, nil
}

}

func (f *fakeConn) CurrentWorkspaceState() (tailnet.WorkspaceUpdate, error) {
Expand Down Expand Up @@ -759,6 +777,178 @@ func TestTunnel_sendAgentUpdateWorkspaceReconnect(t *testing.T) {
require.Equal(t, wID1[:], peerUpdate.DeletedWorkspaces[0].Id)
}

func TestTunnel_slowPing(t *testing.T) {
t.Parallel()

ctx := testutil.Context(t, testutil.WaitShort)

mClock := quartz.NewMock(t)

wID1 := uuid.UUID{1}
aID1 := uuid.UUID{2}
hsTime := time.Now().Add(-time.Minute).UTC()

client := newFakeClient(ctx, t)
conn := newFakeConn(tailnet.WorkspaceUpdate{}, hsTime).withManualPings()

tun, mgr := setupTunnel(t, ctx, client, mClock)
errCh := make(chan error, 1)
var resp *TunnelMessage
go func() {
r, err := mgr.unaryRPC(ctx, &ManagerMessage{
Msg: &ManagerMessage_Start{
Start: &StartRequest{
TunnelFileDescriptor: 2,
CoderUrl: "https://coder.example.com",
ApiToken: "fakeToken",
},
},
})
resp = r
errCh <- err
}()
testutil.RequireSend(ctx, t, client.ch, conn)
err := testutil.TryReceive(ctx, t, errCh)
require.NoError(t, err)
_, ok := resp.Msg.(*TunnelMessage_Start)
require.True(t, ok)

// Inform the tunnel of the initial state
err = tun.Update(tailnet.WorkspaceUpdate{
UpsertedWorkspaces: []*tailnet.Workspace{
{
ID: wID1, Name: "w1", Status: proto.Workspace_STARTING,
},
},
UpsertedAgents: []*tailnet.Agent{
{
ID: aID1,
Name: "agent1",
WorkspaceID: wID1,
Hosts: map[dnsname.FQDN][]netip.Addr{
"agent1.coder.": {netip.MustParseAddr("fd60:627a:a42b:0101::")},
},
},
},
})
require.NoError(t, err)
req := testutil.TryReceive(ctx, t, mgr.requests)
require.Nil(t, req.msg.Rpc)
require.NotNil(t, req.msg.GetPeerUpdate())
require.Len(t, req.msg.GetPeerUpdate().UpsertedAgents, 1)
require.Equal(t, aID1[:], req.msg.GetPeerUpdate().UpsertedAgents[0].Id)

// We can't check that it *never* pings, so the best we can do is
// check it doesn't ping even with 5 goroutines attempting to,
// and that updates are received as normal
for range 5 {
mClock.AdvanceNext()
require.Nil(t, req.msg.GetPeerUpdate().UpsertedAgents[0].LastPing)
}

// Provided that it hasn't been 5 seconds since the last AdvanceNext call,
// there'll be a ping in-flight that will return with this message
testutil.RequireSend(ctx, t, conn.returnPing, struct{}{})
// Which will mean we'll eventually receive a PeerUpdate with the ping
testutil.Eventually(ctx, t, func(ctx context.Context) bool {
mClock.AdvanceNext()
req = testutil.TryReceive(ctx, t, mgr.requests)
if len(req.msg.GetPeerUpdate().UpsertedAgents) == 0 {
return false
}
if req.msg.GetPeerUpdate().UpsertedAgents[0].LastPing == nil {
return false
}
if req.msg.GetPeerUpdate().UpsertedAgents[0].LastPing.Latency.AsDuration().Milliseconds() != 100 {
return false
}
return req.msg.GetPeerUpdate().UpsertedAgents[0].LastPing.PreferredDerp == "Coder Region"
}, testutil.IntervalFast)
}

func TestTunnel_stopMidPing(t *testing.T) {
t.Parallel()

ctx := testutil.Context(t, testutil.WaitShort)

mClock := quartz.NewMock(t)

wID1 := uuid.UUID{1}
aID1 := uuid.UUID{2}
hsTime := time.Now().Add(-time.Minute).UTC()

client := newFakeClient(ctx, t)
conn := newFakeConn(tailnet.WorkspaceUpdate{}, hsTime).withManualPings()

tun, mgr := setupTunnel(t, ctx, client, mClock)
errCh := make(chan error, 1)
var resp *TunnelMessage
go func() {
r, err := mgr.unaryRPC(ctx, &ManagerMessage{
Msg: &ManagerMessage_Start{
Start: &StartRequest{
TunnelFileDescriptor: 2,
CoderUrl: "https://coder.example.com",
ApiToken: "fakeToken",
},
},
})
resp = r
errCh <- err
}()
testutil.RequireSend(ctx, t, client.ch, conn)
err := testutil.TryReceive(ctx, t, errCh)
require.NoError(t, err)
_, ok := resp.Msg.(*TunnelMessage_Start)
require.True(t, ok)

// Inform the tunnel of the initial state
err = tun.Update(tailnet.WorkspaceUpdate{
UpsertedWorkspaces: []*tailnet.Workspace{
{
ID: wID1, Name: "w1", Status: proto.Workspace_STARTING,
},
},
UpsertedAgents: []*tailnet.Agent{
{
ID: aID1,
Name: "agent1",
WorkspaceID: wID1,
Hosts: map[dnsname.FQDN][]netip.Addr{
"agent1.coder.": {netip.MustParseAddr("fd60:627a:a42b:0101::")},
},
},
},
})
require.NoError(t, err)
req := testutil.TryReceive(ctx, t, mgr.requests)
require.Nil(t, req.msg.Rpc)
require.NotNil(t, req.msg.GetPeerUpdate())
require.Len(t, req.msg.GetPeerUpdate().UpsertedAgents, 1)
require.Equal(t, aID1[:], req.msg.GetPeerUpdate().UpsertedAgents[0].Id)

// We'll have some pings in flight when we stop
for ra 1E0A nge 5 {
mClock.AdvanceNext()
req = testutil.TryReceive(ctx, t, mgr.requests)
require.Nil(t, req.msg.GetPeerUpdate().UpsertedAgents[0].LastPing)
}

// Stop the tunnel
go func() {
r, err := mgr.unaryRPC(ctx, &ManagerMessage{
Msg: &ManagerMessage_Stop{},
})
resp = r
errCh <- err
}()
testutil.TryReceive(ctx, t, conn.closed)
err = testutil.TryReceive(ctx, t, errCh)
require.NoError(t, err)
_, ok = resp.Msg.(*TunnelMessage_Stop)
require.True(t, ok)
}

//nolint:revive // t takes precedence
func setupTunnel(t *testing.T, ctx context.Context, client *fakeClient, mClock *quartz.Mock) (*Tunnel, *speaker[*ManagerMessage, *TunnelMessage, TunnelMessage]) {
mp, tp := net.Pipe()
Expand Down
Loading
0