8000 feat: agent uses Tailnet v2 API for DERPMap updates · coder/coder@a0dd3a7 · GitHub
[go: up one dir, main page]

Skip to content

Commit a0dd3a7

Browse files
committed
feat: agent uses Tailnet v2 API for DERPMap updates
1 parent 8ad2024 commit a0dd3a7

File tree

8 files changed

+106
-196
lines changed
  • codersdk/agentsdk
  • tailnet
  • 8 files changed

    +106
    -196
    lines changed

    agent/agent.go

    Lines changed: 34 additions & 37 deletions
    Original file line numberDiff line numberDiff line change
    @@ -89,7 +89,6 @@ type Options struct {
    8989
    type Client interface {
    9090
    Manifest(ctx context.Context) (agentsdk.Manifest, error)
    9191
    Listen(ctx context.Context) (drpc.Conn, error)
    92-
    DERPMapUpdates(ctx context.Context) (<-chan agentsdk.DERPMapUpdate, io.Closer, error)
    9392
    ReportStats(ctx context.Context, log slog.Logger, statsChan 8000 <-chan *agentsdk.Stats, setInterval func(time.Duration)) (io.Closer, error)
    9493
    PostLifecycle(ctx context.Context, state agentsdk.PostLifecycleRequest) error
    9594
    PostAppHealth(ctx context.Context, req agentsdk.PostAppHealthsRequest) error
    @@ -822,10 +821,22 @@ func (a *agent) run(ctx context.Context) error {
    822821
    network.SetBlockEndpoints(manifest.DisableDirectConnections)
    823822
    }
    824823

    824+
    // Listen returns the dRPC connection we use for both Coordinator and DERPMap updates
    825+
    conn, err := a.client.Listen(ctx)
    826+
    if err != nil {
    827+
    return err
    828+
    }
    829+
    defer func() {
    830+
    cErr := conn.Close()
    831+
    if cErr != nil {
    832+
    a.logger.Debug(ctx, "error closing drpc connection", slog.Error(err))
    833+
    }
    834+
    }()
    835+
    825836
    eg, egCtx := errgroup.WithContext(ctx)
    826837
    eg.Go(func() error {
    827838
    a.logger.Debug(egCtx, "running tailnet connection coordinator")
    828-
    err := a.runCoordinator(egCtx, network)
    839+
    err := a.runCoordinator(egCtx, conn, network)
    829840
    if err != nil {
    830841
    return xerrors.Errorf("run coordinator: %w", err)
    831842
    }
    @@ -834,7 +845,7 @@ func (a *agent) run(ctx context.Context) error {
    834845

    835846
    eg.Go(func() error {
    836847
    a.logger.Debug(egCtx, "running derp map subscriber")
    837-
    err := a.runDERPMapSubscriber(egCtx, network)
    848+
    err := a.runDERPMapSubscriber(egCtx, conn, network)
    838849
    if err != nil {
    839850
    return xerrors.Errorf("run derp map subscriber: %w", err)
    840851
    }
    @@ -1056,21 +1067,8 @@ func (a *agent) createTailnet(ctx context.Context, agentID uuid.UUID, derpMap *t
    10561067

    10571068
    // runCoordinator runs a coordinator and returns whether a reconnect
    10581069
    // should occur.
    1059-
    func (a *agent) runCoordinator(ctx context.Context, network *tailnet.Conn) error {
    1060-
    ctx, cancel := context.WithCancel(ctx)
    1061-
    defer cancel()
    1062-
    1063-
    conn, err := a.client.Listen(ctx)
    1064-
    if err != nil {
    1065-
    return err
    1066-
    }
    1067-
    defer func() {
    1068-
    cErr := conn.Close()
    1069-
    if cErr != nil {
    1070-
    a.logger.Debug(ctx, "error closing drpc connection", slog.Error(err))
    1071-
    }
    1072-
    }()
    1073-
    1070+
    func (a *agent) runCoordinator(ctx context.Context, conn drpc.Conn, network *tailnet.Conn) error {
    1071+
    defer a.logger.Debug(ctx, "disconnected from coordination RPC")
    10741072
    tClient := tailnetproto.NewDRPCTailnetClient(conn)
    10751073
    coordinate, err := tClient.Coordinate(ctx)
    10761074
    if err != nil {
    @@ -1082,7 +1080,7 @@ func (a *agent) runCoordinator(ctx context.Context, network *tailnet.Conn) error
    10821080
    a.logger.Debug(ctx, "error closing Coordinate client", slog.Error(err))
    10831081
    }
    10841082
    }()
    1085-
    a.logger.Info(ctx, "connected to coordination endpoint")
    1083+
    a.logger.Info(ctx, "connected to coordination RPC")
    10861084
    coordination := tailnet.NewRemoteCoordination(a.logger, coordinate, network, uuid.Nil)
    10871085
    select {
    10881086
    case <-ctx.Done():
    @@ -1093,30 +1091,29 @@ func (a *agent) runCoordinator(ctx context.Context, network *tailnet.Conn) error
    10931091
    }
    10941092

    10951093
    // runDERPMapSubscriber runs a coordinator and returns if a reconnect should occur.
    1096-
    func (a *agent) runDERPMapSubscriber(ctx context.Context, network *tailnet.Conn) error {
    1094+
    func (a *agent) runDERPMapSubscriber(ctx context.Context, conn drpc.Conn, network *tailnet.Conn) error {
    1095+
    defer a.logger.Debug(ctx, "disconnected from derp map RPC")
    10971096
    ctx, cancel := context.WithCancel(ctx)
    10981097
    defer cancel()
    1099-
    1100-
    updates, closer, err := a.client.DERPMapUpdates(ctx)
    1098+
    tClient := tailnetproto.NewDRPCTailnetClient(conn)
    1099+
    stream, err := tClient.StreamDERPMaps(ctx, &tailnetproto.StreamDERPMapsRequest{})
    11011100
    if err != nil {
    1102-
    return err
    1101+
    return xerrors.Errorf("stream DERP Maps: %w", err)
    11031102
    }
    1104-
    defer closer.Close()
    1105-
    1106-
    a.logger.Info(ctx, "connected to derp map endpoint")
    1103+
    defer func() {
    1104+
    cErr := stream.Close()
    1105+
    if cErr != nil {
    1106+
    a.logger.Debug(ctx, "error closing DERPMap stream", slog.Error(err))
    1107+
    }
    1108+
    }()
    1109+
    a.logger.Info(ctx, "connected to derp map RPC")
    11071110
    for {
    1108-
    select {
    1109-
    case <-ctx.Done():
    1110-
    return ctx.Err()
    1111-
    case update := <-updates:
    1112-
    if update.Err != nil {
    1113-
    return update.Err
    1114-
    }
    1115-
    if update.DERPMap != nil && !tailnet.CompareDERPMaps(network.DERPMap(), update.DERPMap) {
    1116-
    a.logger.Info(ctx, "updating derp map due to detected changes")
    1117-
    network.SetDERPMap(update.DERPMap)
    1118-
    }
    1111+
    dmp, err := stream.Recv()
    1112+
    if err != nil {
    1113+
    return xerrors.Errorf("recv DERPMap error: %w", err)
    11191114
    }
    1115+
    dm := tailnet.DERPMapFromProto(dmp)
    1116+
    network.SetDERPMap(dm)
    11201117
    }
    11211118
    }
    11221119

    agent/agent_test.go

    Lines changed: 12 additions & 4 deletions
    Original file line numberDiff line numberDiff line change
    @@ -1349,6 +1349,7 @@ func TestAgent_Lifecycle(t *testing.T) {
    13491349
    make(chan *agentsdk.Stats, 50),
    13501350
    tailnet.NewCoordinator(logger),
    13511351
    )
    1352+
    defer client.Close()
    13521353

    13531354
    fs := afero.NewMemMapFs()
    13541355
    agent := agent.New(agent.Options{
    @@ -1683,13 +1684,18 @@ func TestAgent_UpdatedDERP(t *testing.T) {
    16831684
    statsCh,
    16841685
    coordinator,
    16851686
    )
    1687+
    t.Cleanup(func() {
    1688+
    t.Log("closing client")
    1689+
    client.Close()
    1690+
    })
    16861691
    uut := agent.New(agent.Options{
    16871692
    Client: client,
    16881693
    Filesystem: fs,
    16891694
    Logger: logger.Named("agent"),
    16901695
    ReconnectingPTYTimeout: time.Minute,
    16911696
    })
    16921697
    t.Cleanup(func() {
    1698+
    t.Log("closing agent")
    16931699
    _ = uut.Close()
    16941700
    })
    16951701

    @@ -1718,6 +1724,7 @@ func TestAgent_UpdatedDERP(t *testing.T) {
    17181724
    if err != nil {
    17191725
    t.Logf("error closing in-memory coordination: %s", err.Error())
    17201726
    }
    1727+
    t.Logf("closed coordination %s", name)
    17211728
    })
    17221729
    // Force DERP.
    17231730
    conn.SetBlockEndpoints(true)
    @@ -1753,11 +1760,9 @@ func TestAgent_UpdatedDERP(t *testing.T) {
    17531760
    }
    17541761

    17551762
    // Push a new DERP map to the agent.
    1756-
    err := client.PushDERPMapUpdate(agentsdk.DERPMapUpdate{
    1757-
    DERPMap: newDerpMap,
    1758-
    })
    1763+
    err := client.PushDERPMapUpdate(newDerpMap)
    17591764
    require.NoError(t, err)
    1760-
    t.Logf("client Pushed DERPMap update")
    1765+
    t.Logf("pushed DERPMap update to agent")
    17611766

    17621767
    require.Eventually(t, func() bool {
    17631768
    conn := uut.TailnetConn()
    @@ -1826,6 +1831,7 @@ func TestAgent_Reconnect(t *testing.T) {
    18261831
    statsCh,
    18271832
    coordinator,
    18281833
    )
    1834+
    defer client.Close()
    18291835
    initialized := atomic.Int32{}
    18301836
    closer := agent.New(agent.Options{
    18311837
    ExchangeToken: func(ctx context.Context) (string, error) {
    @@ -1862,6 +1868,7 @@ func TestAgent_WriteVSCodeConfigs(t *testing.T) {
    18621868
    make(chan *agentsdk.Stats, 50),
    18631869
    coordinator,
    18641870
    )
    1871+
    defer client.Close()
    18651872
    filesystem := afero.NewMemMapFs()
    18661873
    closer := agent.New(agent.Options{
    18671874
    ExchangeToken: func(ctx context.Context) (string, error) {
    @@ -2039,6 +2046,7 @@ func setupAgent(t *testing.T, metadata agentsdk.Manifest, ptyTimeout time.Durati
    20392046
    statsCh := make(chan *agentsdk.Stats, 50)
    20402047
    fs := afero.NewMemMapFs()
    20412048
    c := agenttest.NewClient(t, logger.Named("agent"), metadata.AgentID, metadata, statsCh, coordinator)
    2049+
    t.Cleanup(c.Close)
    20422050

    20432051
    options := agent.Options{
    20442052
    Client: c,

    agent/agenttest/client.go

    Lines changed: 15 additions & 21 deletions
    F438
    Original file line numberDiff line numberDiff line change
    @@ -39,12 +39,12 @@ func NewClient(t testing.TB,
    3939
    coordPtr := atomic.Pointer[tailnet.Coordinator]{}
    4040
    coordPtr.Store(&coordinator)
    4141
    mux := drpcmux.New()
    42+
    derpMapUpdates := make(chan *tailcfg.DERPMap)
    4243
    drpcService := &tailnet.DRPCService{
    43-
    CoordPtr: &coordPtr,
    44-
    Logger: logger,
    45-
    // TODO: handle DERPMap too!
    46-
    DerpMapUpdateFrequency: time.Hour,
    47-
    DerpMapFn: func() *tailcfg.DERPMap { panic("not implemented") },
    44+
    CoordPtr: &coordPtr,
    45+
    Logger: logger,
    46+
    DerpMapUpdateFrequency: time.Microsecond,
    47+
    DerpMapFn: func() *tailcfg.DERPMap { return <-derpMapUpdates },
    4848
    }
    4949
    err := proto.DRPCRegisterTailnet(mux, drpcService)
    5050
    require.NoError(t, err)
    @@ -64,7 +64,7 @@ func NewClient(t testing.TB,
    6464
    statsChan: statsChan,
    6565
    coordinator: coordinator,
    6666
    server: server,
    67-
    derpMapUpdates: make(chan agentsdk.DERPMapUpdate),
    67+
    derpMapUpdates: derpMapUpdates,
    6868
    }
    6969
    }
    7070

    @@ -85,23 +85,26 @@ type Client struct {
    8585
    lifecycleStates []codersdk.WorkspaceAgentLifecycle
    8686
    startup agentsdk.PostStartupRequest
    8787
    logs []agentsdk.Log
    88-
    derpMapUpdates chan agentsdk.DERPMapUpdate
    88+
    derpMapUpdates chan *tailcfg.DERPMap
    89+
    derpMapOnce sync.Once
    90+
    }
    91+
    92+
    func (c *Client) Close() {
    93+
    c.derpMapOnce.Do(func() { close(c.derpMapUpdates) })
    8994
    }
    9095

    9196
    func (c *Client) Manifest(_ context.Context) (agentsdk.Manifest, error) {
    9297
    return c.manifest, nil
    9398
    }
    9499

    95-
    func (c *Client) Listen(_ context.Context) (drpc.Conn, error) {
    100+
    func (c *Client) Listen(ctx context.Context) (drpc.Conn, error) {
    96101
    conn, lis := drpcsdk.MemTransportPipe()
    97-
    closed := make(chan struct{})
    98102
    c.LastWorkspaceAgent = func() {
    99103
    _ = conn.Close()
    100104
    _ = lis.Close()
    101-
    <-closed
    102105
    }
    103106
    c.t.Cleanup(c.LastWorkspaceAgent)
    104-
    serveCtx, cancel := context.WithCancel(context.Background())
    107+
    serveCtx, cancel := context.WithCancel(ctx)
    105108
    c.t.Cleanup(cancel)
    106109
    auth := tailnet.AgentTunnelAuth{}
    107110
    streamID := tailnet.StreamID{
    @@ -112,7 +115,6 @@ func (c *Client) Listen(_ context.Context) (drpc.Conn, error) {
    112115
    serveCtx = tailnet.WithStreamID(serveCtx, streamID)
    113116
    go func() {
    114117
    _ = c.server.Serve(serveCtx, lis)
    115-
    close(closed)
    116118
    }()
    117119
    return conn, nil
    118120
    }
    @@ -235,7 +237,7 @@ func (c *Client) GetServiceBanner(ctx context.Context) (codersdk.ServiceBannerCo
    235237
    return codersdk.ServiceBannerConfig{}, nil
    236238
    }
    237239

    238-
    func (c *Client) PushDERPMapUpdate(update agentsdk.DERPMapUpdate) error {
    240+
    func (c *Client) PushDERPMapUpdate(update *tailcfg.DERPMap) error {
    239241
    timer := time.NewTimer(testutil.WaitShort)
    240242
    defer timer.Stop()
    241243
    select {
    @@ -247,14 +249,6 @@ func (c *Client) PushDERPMapUpdate(update agentsdk.DERPMapUpdate) error {
    247249
    return nil
    248250
    }
    249251

    250-
    func (c *Client) DERPMapUpdates(_ context.Context) (<-chan agentsdk.DERPMapUpdate, io.Closer, error) {
    251-
    closed := make(chan struct{})
    252-
    return c.derpMapUpdates, closeFunc(func() error {
    253-
    close(closed)
    254-
    return nil
    255-
    }), nil
    256-
    }
    257-
    258252
    type closeFunc func() error
    259253

    260254
    func (c closeFunc) Close() error {

    coderd/tailnet_test.go

    Lines changed: 1 addition & 0 deletions
    Original file line numberDiff line numberDiff line change
    @@ -178,6 +178,7 @@ func setupAgent(t *testing.T, agentAddresses []netip.Prefix) (uuid.UUID, agent.A
    178178
    })
    179179

    180180
    c := agenttest.NewClient(t, logger, manifest.AgentID, manifest, make(chan *agentsdk.Stats, 50), coord)
    181+
    t.Cleanup(c.Close)
    181182

    182183
    options := agent.Options{
    183184
    Client: c,

    coderd/wsconncache/wsconncache_test.go

    Lines changed: 23 additions & 33 deletions
    Original file line numberDiff line numberDiff line change
    @@ -171,13 +171,16 @@ func setupAgent(t *testing.T, manifest agentsdk.Manifest, ptyTimeout time.Durati
    171171
    _ = coordinator.Close()
    172172
    })
    173173
    manifest.AgentID = uuid.New()
    174+
    aC := &client{
    175+
    t: t,
    176+
    agentID: manifest.AgentID,
    177+
    manifest: manifest,
    178+
    coordinator: coordinator,
    179+
    derpMapUpdates: make(chan *tailcfg.DERPMap),
    180+
    }
    181+
    t.Cleanup(aC.close)
    174182
    closer := agent.New(agent.Options{
    175-
    Client: &client{
    176-
    t: t,
    177-
    agentID: manifest.AgentID,
    178-
    manifest: manifest,
    179-
    coordinator: coordinator,
    180-
    },
    183+
    Client: aC,
    181184
    Logger: logger.Named("agent"),
    182185
    ReconnectingPTYTimeout: ptyTimeout,
    183186
    Addresses: []netip.Prefix{netip.PrefixFrom(codersdk.WorkspaceAgentIP, 128)},
    @@ -230,32 +233,20 @@ func setupAgent(t *testing.T, manifest agentsdk.Manifest, ptyTimeout time.Durati
    230233
    }
    231234

    232235
    type client struct {
    233-
    t *testing.T
    234-
    agentID uuid.UUID
    235-
    manifest agentsdk.Manifest
    236-
    coordinator tailnet.Coordinator
    237-
    }
    238-
    239-
    func (c *client) Manifest(_ context.Context) (agentsdk.Manifest, error) {
    240-
    return c.manifest, nil
    236+
    t *testing.T
    237+
    agentID uuid.UUID
    238+
    manifest agentsdk.Manifest
    239+
    coordinator tailnet.Coordinator
    240+
    closeOnce sync.Once
    241+
    derpMapUpdates chan *tailcfg.DERPMap
    241242
    }
    242243

    243-
    type closer struct {
    244-
    closeFunc func() error
    244+
    func (c *client) close() {
    245+
    c.closeOnce.Do(func() { close(c.derpMapUpdates) })
    245246
    }
    246247

    247-
    func (c *closer) Close() error {
    248-
    return c.closeFunc()
    249-
    }
    250-
    251-
    func (*client) DERPMapUpdates(_ context.Context) (<-chan agentsdk.DERPMapUpdate, io.Closer, error) {
    252-
    closed := make(chan struct{})
    253-
    return make(<-chan agentsdk.DERPMapUpdate), &closer{
    254-
    closeFunc: func() error {
    255-
    close(closed)
    256-
    return nil
    257-
    },
    258-
    }, nil
    248+
    func (c *client) Manifest(_ context.Context) (agentsdk.Manifest, error) {
    249+
    return c.manifest, nil
    259250
    }
    260251

    261252
    func (c *client) Listen(_ context.Context) (drpc.Conn, error) {
    @@ -271,11 +262,10 @@ func (c *client) Listen(_ context.Context) (drpc.Conn, error) {
    271262
    coordPtr.Store(&c.coordinator)
    272263
    mux := drpcmux.New()
    273264
    drpcService := &tailnet.DRPCService{
    274-
    CoordPtr: &coordPtr,
    275-
    Logger: logger,
    276-
    // TODO: handle DERPMap too!
    277-
    DerpMapUpdateFrequency: time.Hour,
    278-
    DerpMapFn: func() *tailcfg.DERPMap { panic("not implemented") },
    265+
    CoordPtr: &coordPtr,
    266+
    Logger: logger,
    267+
    DerpMapUpdateFrequency: time.Microsecond,
    268+
    DerpMapFn: func() *tailcfg.DERPMap { return <-c.derpMapUpdates },
    279269
    }
    280270
    err := proto.DRPCRegisterTailnet(mux, drpcService)
    281271
    if err != nil {

    0 commit comments

    Comments
     (0)
    0