@@ -88,7 +88,7 @@ type Options struct {
88
88
89
89
type Client interface {
90
90
ConnectRPC23 (ctx context.Context ) (
91
- proto.DRPCAgentClient23 , tailnetproto.DRPCTailnetClient23 , error ,
91
+ proto.DRPCAgentClient24 , tailnetproto.DRPCTailnetClient23 , error ,
92
92
)
93
93
RewriteDERPMap (derpMap * tailcfg.DERPMap )
94
94
}
@@ -408,7 +408,7 @@ func (t *trySingleflight) Do(key string, fn func()) {
408
408
fn ()
409
409
}
410
410
411
- func (a * agent ) reportMetadata (ctx context.Context , aAPI proto.DRPCAgentClient23 ) error {
411
+ func (a * agent ) reportMetadata (ctx context.Context , aAPI proto.DRPCAgentClient24 ) error {
412
412
tickerDone := make (chan struct {})
413
413
collectDone := make (chan struct {})
414
414
ctx , cancel := context .WithCancel (ctx )
@@ -624,7 +624,7 @@ func (a *agent) reportMetadata(ctx context.Context, aAPI proto.DRPCAgentClient23
624
624
625
625
// reportLifecycle reports the current lifecycle state once. All state
626
626
// changes are reported in order.
627
-
6D40
span>func (a * agent ) reportLifecycle (ctx context.Context , aAPI proto.DRPCAgentClient23 ) error {
627
+ func (a * agent ) reportLifecycle (ctx context.Context , aAPI proto.DRPCAgentClient24 ) error {
628
628
for {
629
629
select {
630
630
case <- a .lifecycleUpdate :
@@ -706,7 +706,7 @@ func (a *agent) setLifecycle(state codersdk.WorkspaceAgentLifecycle) {
706
706
// fetchServiceBannerLoop fetches the service banner on an interval. It will
707
707
// not be fetched immediately; the expectation is that it is primed elsewhere
708
708
// (and must be done before the session actually starts).
709
- func (a * agent ) fetchServiceBannerLoop (ctx context.Context , aAPI proto.DRPCAgentClient23 ) error {
709
+ func (a * agent ) fetchServiceBannerLoop (ctx context.Context , aAPI proto.DRPCAgentClient24 ) error {
710
710
ticker := time .NewTicker (a .announcementBannersRefreshInterval )
711
711
defer ticker .Stop ()
712
712
for {
@@ -759,7 +759,7 @@ func (a *agent) run() (retErr error) {
759
759
connMan := newAPIConnRoutineManager (a .gracefulCtx , a .hardCtx , a .logger , aAPI , tAPI )
760
760
761
761
connMan .startAgentAPI ("init notification banners" , gracefulShutdownBehaviorStop ,
762
- func (ctx context.Context , aAPI proto.DRPCAgentClient23 ) error {
762
+ func (ctx context.Context , aAPI proto.DRPCAgentClient24 ) error {
763
763
bannersProto , err := aAPI .GetAnnouncementBanners (ctx , & proto.GetAnnouncementBannersRequest {})
764
764
if err != nil {
765
765
return xerrors .Errorf ("fetch service banner: %w" , err )
@@ -776,7 +776,7 @@ func (a *agent) run() (retErr error) {
776
776
// sending logs gets gracefulShutdownBehaviorRemain because we want to send logs generated by
777
777
// shutdown scripts.
778
778
connMan .startAgentAPI ("send logs" , gracefulShutdownBehaviorRemain ,
779
- func (ctx context.Context , aAPI proto.DRPCAgentClient23 ) error {
779
+ func (ctx context.Context , aAPI proto.DRPCAgentClient24 ) error {
780
780
err := a .logSender .SendLoop (ctx , aAPI )
781
781
if xerrors .Is (err , agentsdk .LogLimitExceededError ) {
782
782
// we don't want this error to tear down the API connection and propagate to the
@@ -813,10 +813,11 @@ func (a *agent) run() (retErr error) {
813
813
networkOK := newCheckpoint (a .logger )
814
814
manifestOK := newCheckpoint (a .logger )
815
815
816
- connMan .startAgentAPI ("handle manifest" , gracefulShutdownBehaviorStop , a .handleManifest (manifestOK ))
816
+ //connMan.startAgentAPI("handle manifest", gracefulShutdownBehaviorStop, a.handleManifest(manifestOK))
817
+ connMan .startAgentAPI ("handle manifest stream" , gracefulShutdownBehaviorStop , a .handleManifestStream (manifestOK ))
817
818
818
819
connMan .startAgentAPI ("app health reporter" , gracefulShutdownBehaviorStop ,
819
- func (ctx context.Context , aAPI proto.DRPCAgentClient23 ) error {
820
+ func (ctx context.Context , aAPI proto.DRPCAgentClient24 ) error {
820
821
if err := manifestOK .wait (ctx ); err != nil {
821
822
return xerrors .Errorf ("no manifest: %w" , err )
822
823
}
@@ -849,7 +850,7 @@ func (a *agent) run() (retErr error) {
849
850
850
851
connMan .startAgentAPI ("fetch service banner loop" , gracefulShutdownBehaviorStop , a .fetchServiceBannerLoop )
851
852
852
- connMan .startAgentAPI ("stats report loop" , gracefulShutdownBehaviorStop , func (ctx context.Context , aAPI proto.DRPCAgentClient23 ) error {
853
+ connMan .startAgentAPI ("stats report loop" , gracefulShutdownBehaviorStop , func (ctx context.Context , aAPI proto.DRPCAgentClient24 ) error {
853
854
if err := networkOK .wait (ctx ); err != nil {
854
855
return xerrors .Errorf ("no network: %w" , err )
855
856
}
@@ -860,118 +861,173 @@ func (a *agent) run() (retErr error) {
860
861
}
861
862
862
863
// handleManifest returns a function that fetches and processes the manifest
863
- func (a * agent ) handleManifest (manifestOK * checkpoint ) func (ctx context.Context , aAPI proto.DRPCAgentClient23 ) error {
864
- return func (ctx context.Context , aAPI proto.DRPCAgentClient23 ) error {
865
- var (
866
- sentResult = false
867
- err error
868
- )
864
+ func (a * agent ) handleManifest (manifestOK * checkpoint ) func (ctx context.Context , aAPI proto.DRPCAgentClient24 ) error {
865
+ return func (ctx context.Context , aAPI proto.DRPCAgentClient24 ) error {
866
+ var err error
869
867
defer func () {
870
- if ! sentResult {
868
+ if err != nil {
871
869
manifestOK .complete (err )
872
870
}
873
871
}()
872
+
874
873
mp , err := aAPI .GetManifest (ctx , & proto.GetManifestRequest {})
875
874
if err != nil {
876
- return xerrors .Errorf ("fetch metadata : %w" , err )
875
+ return xerrors .Errorf ("fetch manifest : %w" , err )
877
876
}
878
877
a .logger .Info (ctx , "fetched manifest" , slog .F ("manifest" , mp ))
879
- manifest , err := agentsdk .ManifestFromProto (mp )
880
- if err != nil {
881
- a .logger .Critical (ctx , "failed to convert manifest" , slog .F ("manifest" , mp ), slog .Error (err ))
882
- return xerrors .Errorf ("convert manifest: %w" , err )
883
- }
884
- if manifest .AgentID == uuid .Nil {
885
- return xerrors .New ("nil agentID returned by manifest" )
886
- }
887
- a .client .RewriteDERPMap (manifest .DERPMap )
878
+ return a .handleSingleManifest (ctx , aAPI , manifestOK , mp )
879
+ }
880
+ }
888
881
889
- // Expand the directory and send it back to coderd so external
890
- // applications that rely on the directory can use it.
891
- //
892
- // An example is VS Code Remote, which must know the directory
893
- // before initializing a connection.
894
- manifest .Directory , err = expandDirectory (manifest .Directory )
882
+ func (a * agent ) handleManifestStream (manifestOK * checkpoint ) func (ctx context.Context , aAPI proto.DRPCAgentClient24 ) error {
883
+ return func (ctx context.Context , aAPI proto.DRPCAgentClient24 ) error {
884
+ var err error
885
+ defer func () {
886
+ if err != nil {
887
+ manifestOK .complete (err )
888
+ }
889
+ }()
890
+
891
+ client , err := aAPI .StreamManifests (ctx , & proto.GetManifestRequest {})
895
892
if err != nil {
896
- return xerrors .Errorf ("expand directory: %w" , err )
893
+ if err == io .EOF {
894
+ a .logger .Info (ctx , "stream manifest received EOF" )
895
+ return nil
896
+ }
897
+ return xerrors .Errorf ("stream manifests: %w" , err )
897
898
}
898
- subsys , err := agentsdk .ProtoFromSubsystems (a .subsystems )
899
- if err != nil {
900
- a .logger .Critical (ctx , "failed to convert subsystems" , slog .Error (err ))
901
- return xerrors .Errorf ("failed to convert subsystems: %w" , err )
899
+
900
+ for {
901
+ a .logger .Debug (ctx , "waiting on new streamed manifest" )
902
+
903
+ manifest , err := client .Recv ()
904
+ if err != nil {
905
+ return xerrors .Errorf ("recv manifest: %w" , err )
906
+ }
907
+
908
+ a .logger .Info (ctx , "received new streamed manifest" , slog .F ("manifest" , manifest ))
909
+
910
+ err = a .handleSingleManifest (ctx , aAPI , manifestOK , manifest )
911
+ if err != nil {
912
+ return xerrors .Errorf ("handle streamed manifest: %w" , err )
913
+ }
902
914
}
903
- _ , err = aAPI .UpdateStartup (ctx , & proto.UpdateStartupRequest {Startup : & proto.Startup {
904
- Version : buildinfo .Version (),
905
- ExpandedDirectory : manifest .Directory ,
906
- Subsystems : subsys ,
907
- }})
908
- if err != nil {
909
- return xerrors .Errorf ("update workspace agent startup: %w" , err )
915
+ }
916
+ }
917
+
918
+ // TODO: change signature to just take in all inputs instead of returning closure; return error
919
+ func (a * agent ) handleSingleManifest (ctx context.Context , aAPI proto.DRPCAgentClient24 , manifestOK * checkpoint , mp * proto.Manifest ) error {
920
+ var (
921
+ sentResult bool
922
+ err error
923
+ )
924
+ defer func () {
925
+ if ! sentResult {
926
+ manifestOK .complete (err )
910
927
}
928
+ }()
911
929
912
- oldManifest := a .manifest .Swap (& manifest )
913
- manifestOK .complete (nil )
914
- sentResult = true
915
-
916
- // The startup script should only execute on the first run!
917
- if oldManifest == nil {
918
- a .setLifecycle (codersdk .WorkspaceAgentLifecycleStarting )
919
-
920
- // Perform overrides early so that Git auth can work even if users
921
- // connect to a workspace that is not yet ready. We don't run this
922
- // concurrently with the startup script to avoid conflicts between
923
- // them.
924
- if manifest .GitAuthConfigs > 0 {
925
- // If this fails, we should consider surfacing the error in the
926
- // startup log and setting the lifecycle state to be "start_error"
927
- // (after startup script completion), but for now we'll just log it.
928
- err := gitauth .OverrideVSCodeConfigs (a .filesystem )
929
- if err != nil {
930
- a .logger .Warn (ctx , "failed to override vscode git auth configs" , slog .Error (err ))
931
- }
932
- }
930
+ manifest , err := agentsdk .ManifestFromProto (mp )
931
+ if err != nil {
932
+ a .logger .Critical (ctx , "failed to convert manifest" , slog .F ("manifest" , mp ), slog .Error (err ))
933
+ return xerrors .Errorf ("convert manifest: %w" , err )
934
+ }
935
+ if manifest .AgentID == uuid .Nil {
936
+ return xerrors .New ("nil agentID returned by manifest" )
937
+ }
938
+ a .client .RewriteDERPMap (manifest .DERPMap )
933
939
934
- err = a .scriptRunner .Init (manifest .Scripts , aAPI .ScriptCompleted )
940
+ // Expand the directory and send it back to coderd so external
941
+ // applications that rely on the directory can use it.
942
+ //
943
+ // An example is VS Code Remote, which must know the directory
944
+ // before initializing a connection.
945
+ manifest .Directory , err = expandDirectory (manifest .Directory )
946
+ if err != nil {
947
+ return xerrors .Errorf ("expand directory: %w" , err )
948
+ }
949
+ subsys , err := agentsdk .ProtoFromSubsystems (a .subsystems )
950
+ if err != nil {
951
+ a .logger .Critical (ctx , "failed to convert subsystems" , slog .Error (err ))
952
+ return xerrors .Errorf ("failed to convert subsystems: %w" , err )
953
+ }
954
+ _ , err = aAPI .UpdateStartup (ctx , & proto.UpdateStartupRequest {Startup : & proto.Startup {
955
+ Version : buildinfo .Version (),
956
+ ExpandedDirectory : manifest .Directory ,
957
+ Subsystems : subsys ,
958
+ }})
959
+ if err != nil {
960
+ return xerrors .Errorf ("update workspace agent startup: %w" , err )
961
+ }
962
+
963
+ oldManifest := a .manifest .Swap (& manifest )
964
+ manifestOK .complete (nil )
965
+ sentResult = true
966
+
967
+ // TODO: remove
968
+ a .logger .Info (ctx , "NOW OWNED BY" , slog .F ("owner" , manifest .OwnerName ))
969
+
970
+ // TODO: this will probably have to change in the case of prebuilds; maybe check if owner is the same,
971
+ // or add prebuild metadata to manifest?
972
+ // The startup script should only execute on the first run!
973
+ if oldManifest == nil {
974
+ a .setLifecycle (codersdk .WorkspaceAgentLifecycleStarting )
975
+
976
+ // Perform overrides early so that Git auth can work even if users
977
+ // connect to a workspace that is not yet ready. We don't run this
978
+ // concurrently with the startup script to avoid conflicts between
979
+ // them.
980
+ if manifest .GitAuthConfigs > 0 {
981
+ // If this fails, we should consider surfacing the error in the
982
+ // startup log and setting the lifecycle state to be "start_error"
983
+ // (after startup script completion), but for now we'll just log it.
984
+ err := gitauth .OverrideVSCodeConfigs (a .filesystem )
935
985
if err != nil {
936
- return xerrors . Errorf ( "init script runner: %w " , err )
986
+ a . logger . Warn ( ctx , "failed to override vscode git auth configs " , slog . Error ( err ) )
937
987
}
938
- err = a .trackGoroutine (func () {
939
- start := time .Now ()
940
- // here we use the graceful context because the script runner is not directly tied
941
- // to the agent API.
942
- err := a .scriptRunner .Execute (a .gracefulCtx , agentscripts .ExecuteStartScripts )
943
- // Measure the time immediately after the script has finished
944
- dur := time .Since (start ).Seconds ()
945
- if err != nil {
946
- a .logger .Warn (ctx , "startup script(s) failed" , slog .Error (err ))
947
- if errors .Is (err , agentscripts .ErrTimeout ) {
948
- a .setLifecycle (codersdk .WorkspaceAgentLifecycleStartTimeout )
949
- } else {
950
- a .setLifecycle (codersdk .WorkspaceAgentLifecycleStartError )
951
- }
988
+ }
989
+
990
+ err = a .scriptRunner .Init (manifest .Scripts , aAPI .ScriptCompleted )
991
+ if err != nil {
992
+ return xerrors .Errorf ("init script runner: %w" , err )
993
+ }
994
+ err = a .trackGoroutine (func () {
995
+ start := time .Now ()
996
+ // here we use the graceful context because the script runner is not directly tied
997
+ // to the agent API.
998
+ err := a .scriptRunner .Execute (a .gracefulCtx , agentscripts .ExecuteStartScripts )
999
+ // Measure the time immediately after the script has finished
1000
+ dur := time .Since (start ).Seconds ()
1001
+ if err != nil {
1002
+ a .logger .Warn (ctx , "startup script(s) failed" , slog .Error (err ))
1003
+ if errors .Is (err , agentscripts .ErrTimeout ) {
1004
+ a .setLifecycle (codersdk .WorkspaceAgentLifecycleStartTimeout )
952
1005
} else {
953
- a .setLifecycle (codersdk .WorkspaceAgentLifecycleReady )
1006
+ a .setLifecycle (codersdk .WorkspaceAgentLifecycleStartError )
954
1007
}
1008
+ } else {
1009
+ a .setLifecycle (codersdk .WorkspaceAgentLifecycleReady )
1010
+ }
955
1011
956
- label := "false"
957
- if err == nil {
958
- label = "true"
959
- }
960
- a .metrics .startupScriptSeconds .WithLabelValues (label ).Set (dur )
961
- a .scriptRunner .StartCron ()
962
- })
963
- if err != nil {
964
- return xerrors .Errorf ("track conn goroutine: %w" , err )
1012
+ label := "false"
1013
+ if err == nil {
1014
+ label = "true"
965
1015
}
1016
+ a .metrics .startupScriptSeconds .WithLabelValues (label ).Set (dur )
1017
+ a .scriptRunner .StartCron ()
1018
+ })
1019
+ if err != nil {
1020
+ return xerrors .Errorf ("track conn goroutine: %w" , err )
966
1021
}
967
- return nil
968
1022
}
1023
+
1024
+ return nil
969
1025
}
970
1026
971
1027
// createOrUpdateNetwork waits for the manifest to be set using manifestOK, then creates or updates
972
1028
// the tailnet using the information in the manifest
973
- func (a * agent ) createOrUpdateNetwork (manifestOK , networkOK * checkpoint ) func (context.Context , proto.DRPCAgentClient23 ) error {
974
- return func (ctx context.Context , _ proto.DRPCAgentClient23 ) (retErr error ) {
1029
+ func (a * agent ) createOrUpdateNetwork (manifestOK , networkOK * checkpoint ) func (context.Context , proto.DRPCAgentClient24 ) error {
1030
+ return func (ctx context.Context , _ proto.DRPCAgentClient24 ) (retErr error ) {
975
1031
if err := manifestOK .wait (ctx ); err != nil {
976
1032
return xerrors .Errorf ("no manifest: %w" , err )
977
1033
}
@@ -1692,7 +1748,7 @@ const (
1692
1748
1693
1749
type apiConnRoutineManager struct {
1694
1750
logger slog.Logger
1695
- aAPI proto.DRPCAgentClient23
1751
+ aAPI proto.DRPCAgentClient24
1696
1752
tAPI tailnetproto.DRPCTailnetClient23
1697
1753
eg * errgroup.Group
1698
1754
stopCtx context.Context
@@ -1701,7 +1757,7 @@ type apiConnRoutineManager struct {
1701
1757
1702
1758
func newAPIConnRoutineManager (
1703
1759
gracefulCtx , hardCtx context.Context , logger slog.Logger ,
1704
- aAPI proto.DRPCAgentClient23 , tAPI tailnetproto.DRPCTailnetClient23 ,
1760
+ aAPI proto.DRPCAgentClient24 , tAPI tailnetproto.DRPCTailnetClient23 ,
1705
1761
) * apiConnRoutineManager {
1706
1762
// routines that remain in operation during graceful shutdown use the remainCtx. They'll still
1707
1763
// exit if the errgroup hits an error, which usually means a problem with the conn.
@@ -1734,7 +1790,7 @@ func newAPIConnRoutineManager(
1734
1790
// but for Tailnet.
1735
1791
func (a * apiConnRoutineManager ) startAgentAPI (
1736
1792
name string , behavior gracefulShutdownBehavior ,
1737
- f func (context.Context , proto.DRPCAgentClient23 ) error ,
1793
+ f func (context.Context , proto.DRPCAgentClient24 ) error ,
1738
1794
) {
1739
1795
logger := a .logger .With (slog .F ("name" , name ))
1740
1796
var ctx context.Context
0 commit comments