8000 WIP: very basic manifest stream from server to agent · coder/coder@903f896 · GitHub
[go: up one dir, main page]

Skip to content

Commit 903f896

Browse files
committed
WIP: very basic manifest stream from server to agent
Signed-off-by: Danny Kopping <danny@coder.com>
1 parent 7a25973 commit 903f896

File tree

9 files changed

+356
-189
lines changed

9 files changed

+356
-189
lines changed

agent/agent.go

Lines changed: 153 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ type Options struct {
8888

8989
type Client interface {
9090
ConnectRPC23(ctx context.Context) (
91-
proto.DRPCAgentClient23, tailnetproto.DRPCTailnetClient23, error,
91+
proto.DRPCAgentClient24, tailnetproto.DRPCTailnetClient23, error,
9292
)
9393
RewriteDERPMap(derpMap *tailcfg.DERPMap)
9494
}
@@ -408,7 +408,7 @@ func (t *trySingleflight) Do(key string, fn func()) {
408408
fn()
409409
}
410410

411-
func (a *agent) reportMetadata(ctx context.Context, aAPI proto.DRPCAgentClient23) error {
411+
func (a *agent) reportMetadata(ctx context.Context, aAPI proto.DRPCAgentClient24) error {
412412
tickerDone := make(chan struct{})
413413
collectDone := make(chan struct{})
414414
ctx, cancel := context.WithCancel(ctx)
@@ -624,7 +624,7 @@ func (a *agent) reportMetadata(ctx context.Context, aAPI proto.DRPCAgentClient23
624624

625625
// reportLifecycle reports the current lifecycle state once. All state
626626
// changes are reported in order.
627-
func (a *agent) reportLifecycle(ctx context.Context, aAPI proto.DRPCAgentClient23) error {
627+
func (a *agent) reportLifecycle(ctx context.Context, aAPI proto.DRPCAgentClient24) error {
628628
for {
629629
select {
630630
case <-a.lifecycleUpdate:
@@ -706,7 +706,7 @@ func (a *agent) setLifecycle(state codersdk.WorkspaceAgentLifecycle) {
706706
// fetchServiceBannerLoop fetches the service banner on an interval. It will
707707
// not be fetched immediately; the expectation is that it is primed elsewhere
708708
// (and must be done before the session actually starts).
709-
func (a *agent) fetchServiceBannerLoop(ctx context.Context, aAPI proto.DRPCAgentClient23) error {
709+
func (a *agent) fetchServiceBannerLoop(ctx context.Context, aAPI proto.DRPCAgentClient24) error {
710710
ticker := time.NewTicker(a.announcementBannersRefreshInterval)
711711
defer ticker.Stop()
712712
for {
@@ -759,7 +759,7 @@ func (a *agent) run() (retErr error) {
759759
connMan := newAPIConnRoutineManager(a.gracefulCtx, a.hardCtx, a.logger, aAPI, tAPI)
760760

761761
connMan.startAgentAPI("init notification banners", gracefulShutdownBehaviorStop,
762-
func(ctx context.Context, aAPI proto.DRPCAgentClient23) error {
762+
func(ctx context.Context, aAPI proto.DRPCAgentClient24) error {
763763
bannersProto, err := aAPI.GetAnnouncementBanners(ctx, &proto.GetAnnouncementBannersRequest{})
764764
if err != nil {
765765
return xerrors.Errorf("fetch service banner: %w", err)
@@ -776,7 +776,7 @@ func (a *agent) run() (retErr error) {
776776
// sending logs gets gracefulShutdownBehaviorRemain because we want to send logs generated by
777777
// shutdown scripts.
778778
connMan.startAgentAPI("send logs", gracefulShutdownBehaviorRemain,
779-
func(ctx context.Context, aAPI proto.DRPCAgentClient23) error {
779+
func(ctx context.Context, aAPI proto.DRPCAgentClient24) error {
780780
err := a.logSender.SendLoop(ctx, aAPI)
781781
if xerrors.Is(err, agentsdk.LogLimitExceededError) {
782782
// we don't want this error to tear down the API connection and propagate to the
@@ -813,10 +813,11 @@ func (a *agent) run() (retErr error) {
813813
networkOK := newCheckpoint(a.logger)
814814
manifestOK := newCheckpoint(a.logger)
815815

816-
connMan.startAgentAPI("handle manifest", gracefulShutdownBehaviorStop, a.handleManifest(manifestOK))
816+
//connMan.startAgentAPI("handle manifest", gracefulShutdownBehaviorStop, a.handleManifest(manifestOK))
817+
connMan.startAgentAPI("handle manifest stream", gracefulShutdownBehaviorStop, a.handleManifestStream(manifestOK))
817818

818819
connMan.startAgentAPI("app health reporter", gracefulShutdownBehaviorStop,
819-
func(ctx context.Context, aAPI proto.DRPCAgentClient23) error {
820+
func(ctx context.Context, aAPI proto.DRPCAgentClient24) error {
820821
if err := manifestOK.wait(ctx); err != nil {
821822
return xerrors.Errorf("no manifest: %w", err)
822823
}
@@ -849,7 +850,7 @@ func (a *agent) run() (retErr error) {
849850

850851
connMan.startAgentAPI("fetch service banner loop", gracefulShutdownBehaviorStop, a.fetchServiceBannerLoop)
851852

852-
connMan.startAgentAPI("stats report loop", gracefulShutdownBehaviorStop, func(ctx context.Context, aAPI proto.DRPCAgentClient23) error {
853+
connMan.startAgentAPI("stats report loop", gracefulShutdownBehaviorStop, func(ctx context.Context, aAPI proto.DRPCAgentClient24) error {
853854
if err := networkOK.wait(ctx); err != nil {
854855
return xerrors.Errorf("no network: %w", err)
855856
}
@@ -860,118 +861,173 @@ func (a *agent) run() (retErr error) {
860861
}
861862

862863
// handleManifest returns a function that fetches and processes the manifest
863-
func (a *agent) handleManifest(manifestOK *checkpoint) func(ctx context.Context, aAPI proto.DRPCAgentClient23) error {
864-
return func(ctx context.Context, aAPI proto.DRPCAgentClient23) error {
865-
var (
866-
sentResult = false
867-
err error
868-
)
864+
func (a *agent) handleManifest(manifestOK *checkpoint) func(ctx context.Context, aAPI proto.DRPCAgentClient24) error {
865+
return func(ctx context.Context, aAPI proto.DRPCAgentClient24) error {
866+
var err error
869867
defer func() {
870-
if !sentResult {
868+
if err != nil {
871869
manifestOK.complete(err)
872870
}
873871
}()
872+
874873
mp, err := aAPI.GetManifest(ctx, &proto.GetManifestRequest{})
875874
if err != nil {
876-
return xerrors.Errorf("fetch metadata: %w", err)
875+
return xerrors.Errorf("fetch manifest: %w", err)
877876
}
878877
a.logger.Info(ctx, "fetched manifest", slog.F("manifest", mp))
879-
manifest, err := agentsdk.ManifestFromProto(mp)
880-
if err != nil {
881-
a.logger.Critical(ctx, "failed to convert manifest", slog.F("manifest", mp), slog.Error(err))
882-
return xerrors.Errorf("convert manifest: %w", err)
883-
}
884-
if manifest.AgentID == uuid.Nil {
885-
return xerrors.New("nil agentID returned by manifest")
886-
}
887-
a.client.RewriteDERPMap(manifest.DERPMap)
878+
return a.handleSingleManifest(ctx, aAPI, manifestOK, mp)
879+
}
880+
}
888881

889-
// Expand the directory and send it back to coderd so external
890-
// applications that rely on the directory can use it.
891-
//
892-
// An example is VS Code Remote, which must know the directory
893-
// before initializing a connection.
894-
manifest.Directory, err = expandDirectory(manifest.Directory)
882+
func (a *agent) handleManifestStream(manifestOK *checkpoint) func(ctx context.Context, aAPI proto.DRPCAgentClient24) error {
883+
return func(ctx context.Context, aAPI proto.DRPCAgentClient24) error {
884+
var err error
885+
defer func() {
886+
if err != nil {
887+
manifestOK.complete(err)
888+
}
889+
}()
890+
891+
client, err := aAPI.StreamManifests(ctx, &proto.GetManifestRequest{})
895892
if err != nil {
896-
return xerrors.Errorf("expand directory: %w", err)
893+
if err == io.EOF {
894+
a.logger.Info(ctx, "stream manifest received EOF")
895+
return nil
896+
}
897+
return xerrors.Errorf("stream manifests: %w", err)
897898
}
898-
subsys, err := agentsdk.ProtoFromSubsystems(a.subsystems)
899-
if err != nil {
900-
a.logger.Critical(ctx, "failed to convert subsystems", slog.Error(err))
901-
return xerrors.Errorf("failed to convert subsystems: %w", err)
899+
900+
for {
901+
a.logger.Debug(ctx, "waiting on new streamed manifest")
902+
903+
manifest, err := client.Recv()
904+
if err != nil {
905+
return xerrors.Errorf("recv manifest: %w", err)
906+
}
907+
908+
a.logger.Info(ctx, "received new streamed manifest", slog.F("manifest", manifest))
909+
910+
err = a.handleSingleManifest(ctx, aAPI, manifestOK, manifest)
911+
if err != nil {
912+
return xerrors.Errorf("handle streamed manifest: %w", err)
913+
}
902914
}
903-
_, err = aAPI.UpdateStartup(ctx, &proto.UpdateStartupRequest{Startup: &proto.Startup{
904-
Version: buildinfo.Version(),
905-
ExpandedDirectory: manifest.Directory,
906-
Subsystems: subsys,
907-
}})
908-
if err != nil {
909-
return xerrors.Errorf("update workspace agent startup: %w", err)
915+
}
916+
}
917+
918+
// TODO: change signature to just take in all inputs instead of returning closure; return error
919+
func (a *agent) handleSingleManifest(ctx context.Context, aAPI proto.DRPCAgentClient24, manifestOK *checkpoint, mp *proto.Manifest) error {
920+
var (
921+
sentResult bool
922+
err error
923+
)
924+
defer func() {
925+
if !sentResult {
926+
manifestOK.complete(err)
910927
}
928+
}()
911929

912-
oldManifest := a.manifest.Swap(&manifest)
913-
manifestOK.complete(nil)
914-
sentResult = true
915-
916-
// The startup script should only execute on the first run!
917-
if oldManifest == nil {
918-
a.setLifecycle(codersdk.WorkspaceAgentLifecycleStarting)
919-
920-
// Perform overrides early so that Git auth can work even if users
921-
// connect to a workspace that is not yet ready. We don't run this
922-
// concurrently with the startup script to avoid conflicts between
923-
// them.
924-
if manifest.GitAuthConfigs > 0 {
925-
// If this fails, we should consider surfacing the error in the
926-
// startup log and setting the lifecycle state to be "start_error"
927-
// (after startup script completion), but for now we'll just log it.
928-
err := gitauth.OverrideVSCodeConfigs(a.filesystem)
929-
if err != nil {
930-
a.logger.Warn(ctx, "failed to override vscode git auth configs", slog.Error(err))
931-
}
932-
}
930+
manifest, err := agentsdk.ManifestFromProto(mp)
931+
if err != nil {
932+
a.logger.Critical(ctx, "failed to convert manifest", slog.F("manifest", mp), slog.Error(err))
933+
return xerrors.Errorf("convert manifest: %w", err)
934+
}
935+
if manifest.AgentID == uuid.Nil {
936+
return xerrors.New("nil agentID returned by manifest")
937+
}
938+
a.client.RewriteDERPMap(manifest.DERPMap)
933939

934-
err = a.scriptRunner.Init(manifest.Scripts, aAPI.ScriptCompleted)
940+
// Expand the directory and send it back to coderd so external
941+
// applications that rely on the directory can use it.
942+
//
943+
// An example is VS Code Remote, which must know the directory
944+
// before initializing a connection.
945+
manifest.Directory, err = expandDirectory(manifest.Directory)
946+
if err != nil {
947+
return xerrors.Errorf("expand directory: %w", err)
948+
}
949+
subsys, err := agentsdk.ProtoFromSubsystems(a.subsystems)
950+
if err != nil {
951+
a.logger.Critical(ctx, "failed to convert subsystems", slog.Error(err))
952+
return xerrors.Errorf("failed to convert subsystems: %w", err)
953+
}
954+
_, err = aAPI.UpdateStartup(ctx, &proto.UpdateStartupRequest{Startup: &proto.Startup{
955+
Version: buildinfo.Version(),
956+
ExpandedDirectory: manifest.Directory,
957+
Subsystems: subsys,
958+
}})
959+
if err != nil {
960+
return xerrors.Errorf("update workspace agent startup: %w", err)
961+
}
962+
963+
oldManifest := a.manifest.Swap(&manifest)
964+
manifestOK.complete(nil)
965+
sentResult = true
966+
967+
// TODO: remove
968+
a.logger.Info(ctx, "NOW OWNED BY", slog.F("owner", manifest.OwnerName))
969+
970+
// TODO: this will probably have to change in the case of prebuilds; maybe check if owner is the same,
971+
// or add prebuild metadata to manifest?
972+
// The startup script should only execute on the first run!
973+
if oldManifest == nil {
974+
a.setLifecycle(codersdk.WorkspaceAgentLifecycleStarting)
975+
976+
// Perform overrides early so that Git auth can work even if users
977+
// connect to a workspace that is not yet ready. We don't run this
978+
// concurrently with the startup script to avoid conflicts between
979+
// them.
980+
if manifest.GitAuthConfigs > 0 {
981+
// If this fails, we should consider surfacing the error in the
982+
// startup log and setting the lifecycle state to be "start_error"
983+
// (after startup script completion), but for now we'll just log it.
984+
err := gitauth.OverrideVSCodeConfigs(a.filesystem)
935985
if err != nil {
936-
return xerrors.Errorf("init script runner: %w", err)
986+
a.logger.Warn(ctx, "failed to override vscode git auth configs", slog.Error(err))
937987
}
938-
err = a.trackGoroutine(func() {
939-
start := time.Now()
940-
// here we use the graceful context because the script runner is not directly tied
941-
// to the agent API.
942-
err := a.scriptRunner.Execute(a.gracefulCtx, agentscripts.ExecuteStartScripts)
943-
// Measure the time immediately after the script has finished
944-
dur := time.Since(start).Seconds()
945-
if err != nil {
946-
a.logger.Warn(ctx, "startup script(s) failed", slog.Error(err))
947-
if errors.Is(err, agentscripts.ErrTimeout) {
948-
a.setLifecycle(codersdk.WorkspaceAgentLifecycleStartTimeout)
949-
} else {
950-
a.setLifecycle(codersdk.WorkspaceAgentLifecycleStartError)
951-
}
988+
}
989+
990+
err = a.scriptRunner.Init(manifest.Scripts, aAPI.ScriptCompleted)
991+
if err != nil {
992+
return xerrors.Errorf("init script runner: %w", err)
993+
}
994+
err = a.trackGoroutine(func() {
995+
start := time.Now()
996+
// here we use the graceful context because the script runner is not directly tied
997+
// to the agent API.
998+
err := a.scriptRunner.Execute(a.gracefulCtx, agentscripts.ExecuteStartScripts)
999+
// Measure the time immediately after the script has finished
1000+
dur := time.Since(start).Seconds()
1001+
if err != nil {
1002+
a.logger.Warn(ctx, "startup script(s) failed", slog.Error(err))
1003+
if errors.Is(err, agentscripts.ErrTimeout) {
1004+
a.setLifecycle(codersdk.WorkspaceAgentLifecycleStartTimeout)
9521005
} else {
953-
a.setLifecycle(codersdk.WorkspaceAgentLifecycleReady)
1006+
a.setLifecycle(codersdk.WorkspaceAgentLifecycleStartError)
9541007
}
1008+
} else {
1009+
a.setLifecycle(codersdk.WorkspaceAgentLifecycleReady)
1010+
}
9551011

956-
label := "false"
957-
if err == nil {
958-
label = "true"
959-
}
960-
a.metrics.startupScriptSeconds.WithLabelValues(label).Set(dur)
961-
a.scriptRunner.StartCron()
962-
})
963-
if err != nil {
964-
return xerrors.Errorf("track conn goroutine: %w", err)
1012+
label := "false"
1013+
if err == nil {
1014+
label = "true"
9651015
}
1016+
a.metrics.startupScriptSeconds.WithLabelValues(label).Set(dur)
1017+
a.scriptRunner.StartCron()
1018+
})
1019+
if err != nil {
1020+
return xerrors.Errorf("track conn goroutine: %w", err)
9661021
}
967-
return nil
9681022
}
1023+
1024+
return nil
9691025
}
9701026

9711027
// createOrUpdateNetwork waits for the manifest to be set using manifestOK, then creates or updates
9721028
// the tailnet using the information in the manifest
973-
func (a *agent) createOrUpdateNetwork(manifestOK, networkOK *checkpoint) func(context.Context, proto.DRPCAgentClient23) error {
974-
return func(ctx context.Context, _ proto.DRPCAgentClient23) (retErr error) {
1029+
func (a *agent) createOrUpdateNetwork(manifestOK, networkOK *checkpoint) func(context.Context, proto.DRPCAgentClient24) error {
1030+
return func(ctx context.Context, _ proto.DRPCAgentClient24) (retErr error) {
9751031
if err := manifestOK.wait(ctx); err != nil {
9761032
return xerrors.Errorf("no manifest: %w", err)
9771033
}
@@ -1692,7 +1748,7 @@ const (
16921748

16931749
type apiConnRoutineManager struct {
16941750
logger slog.Logger
1695-
aAPI proto.DRPCAgentClient23
1751+
aAPI proto.DRPCAgentClient24
16961752
tAPI tailnetproto.DRPCTailnetClient23
16971753
eg *errgroup.Group
16981754
stopCtx context.Context
@@ -1701,7 +1757,7 @@ type apiConnRoutineManager struct {
17011757

17021758
func newAPIConnRoutineManager(
17031759
gracefulCtx, hardCtx context.Context, logger slog.Logger,
1704-
aAPI proto.DRPCAgentClient23, tAPI tailnetproto.DRPCTailnetClient23,
1760+
aAPI proto.DRPCAgentClient24, tAPI tailnetproto.DRPCTailnetClient23,
17051761
) *apiConnRoutineManager {
17061762
// routines that remain in operation during graceful shutdown use the remainCtx. They'll still
17071763
// exit if the errgroup hits an error, which usually means a problem with the conn.
@@ -1734,7 +1790,7 @@ func newAPIConnRoutineManager(
17341790
// but for Tailnet.
17351791
func (a *apiConnRoutineManager) startAgentAPI(
17361792
name string, behavior gracefulShutdownBehavior,
1737-
f func(context.Context, proto.DRPCAgentClient23) error,
1793+
f func(context.Context, proto.DRPCAgentClient24) error,
17381794
) {
17391795
logger := a.logger.With(slog.F("name", name))
17401796
var ctx context.Context

agent/agenttest/client.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ func (c *Client) Close() {
9797
}
9898

9999
func (c *Client) ConnectRPC23(ctx context.Context) (
100-
agentproto.DRPCAgentClient23, proto.DRPCTailnetClient23, error,
100+
agentproto.DRPCAgentClient24, proto.DRPCTailnetClient23, error,
101101
) {
102102
conn, lis := drpcsdk.MemTransportPipe()
103103
c.LastWorkspaceAgent = func() {

0 commit comments

Comments
 (0)
0