-
Notifications
You must be signed in to change notification settings - Fork 943
feat(agent): add connection reporting for SSH and reconnecing PTY #16652
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
58463c6
a77ceac
17ddb8e
82b6fab
4166c29
0318212
d47e8a4
601b6f4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
Updates #15139
- Loading branch information
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,6 +27,7 @@ import ( | |
"golang.org/x/exp/slices" | ||
"golang.org/x/sync/errgroup" | ||
"golang.org/x/xerrors" | ||
"google.golang.org/protobuf/types/known/timestamppb" | ||
"tailscale.com/net/speedtest" | ||
"tailscale.com/tailcfg" | ||
"tailscale.com/types/netlogtype" | ||
|
@@ -174,6 +175,7 @@ func New(options Options) Agent { | |
lifecycleUpdate: make(chan struct{}, 1), | ||
lifecycleReported: make(chan codersdk.WorkspaceAgentLifecycle, 1), | ||
lifecycleStates: []agentsdk.PostLifecycleRequest{{State: codersdk.WorkspaceAgentLifecycleCreated}}, | ||
reportConnectionsUpdate: make(chan struct{}, 1), | ||
ignorePorts: options.IgnorePorts, | ||
portCacheDuration: options.PortCacheDuration, | ||
reportMetadataInterval: options.ReportMetadataInterval, | ||
|
@@ -247,6 +249,10 @@ type agent struct { | |
lifecycleStates []agentsdk.PostLifecycleRequest | ||
lifecycleLastReportedIndex int // Keeps track of the last lifecycle state we successfully reported. | ||
|
||
reportConnectionsUpdate chan struct{} | ||
reportConnectionsMu sync.Mutex | ||
reportConnections []*proto.ReportConnectionRequest | ||
|
||
network *tailnet.Conn | ||
statsReporter *statsReporter | ||
logSender *agentsdk.LogSender | ||
|
@@ -272,6 +278,24 @@ func (a *agent) init() { | |
UpdateEnv: a.updateCommandEnv, | ||
WorkingDirectory: func() string { return a.manifest.Load().Directory }, | ||
BlockFileTransfer: a.blockFileTransfer, | ||
ReportConnection: func(id uuid.UUID, magicType agentssh.MagicSessionType, ip string) func(code int, reason string) { | ||
var connectionType proto.Connection_Type | ||
switch magicType { | ||
case agentssh.MagicSessionTypeSSH: | ||
connectionType = proto.Connection_SSH | ||
case agentssh.MagicSessionTypeVSCode: | ||
connectionType = proto.Connection_VSCODE | ||
case agentssh.MagicSessionTypeJetBrains: | ||
connectionType = proto.Connection_JETBRAINS | ||
case agentssh.MagicSessionTypeUnknown: | ||
connectionType = proto.Connection_TYPE_UNSPECIFIED | ||
default: | ||
a.logger.Error(a.hardCtx, "unhandled magic session type when reporting connection", slog.F("magic_type", magicType)) | ||
connectionType = proto.Connection_TYPE_UNSPECIFIED | ||
} | ||
|
||
return a.reportConnection(id, connectionType, ip) | ||
}, | ||
}) | ||
if err != nil { | ||
panic(err) | ||
|
@@ -294,6 +318,9 @@ func (a *agent) init() { | |
a.reconnectingPTYServer = reconnectingpty.NewServer( | ||
a.logger.Named("reconnecting-pty"), | ||
a.sshServer, | ||
func(id uuid.UUID, ip string) func(code int, reason string) { | ||
return a.reportConnection(id, proto.Connection_RECONNECTING_PTY, ip) | ||
}, | ||
a.metrics.connectionsTotal, a.metrics.reconnectingPTYErrors, | ||
a.reconnectingPTYTimeout, | ||
) | ||
|
@@ -703,6 +730,91 @@ func (a *agent) setLifecycle(state codersdk.WorkspaceAgentLifecycle) { | |
} | ||
} | ||
|
||
// reportConnectionsLoop reports connections to the agent for auditing. | ||
func (a *agent) reportConnectionsLoop(ctx context.Context, aAPI proto.DRPCAgentClient24) error { | ||
for { | ||
select { | ||
case <-a.reportConnectionsUpdate: | ||
case <-ctx.Done(): | ||
return ctx.Err() | ||
} | ||
|
||
for { | ||
a.reportConnectionsMu.Lock() | ||
if len(a.reportConnections) == 0 { | ||
a.reportConnectionsMu.Unlock() | ||
break | ||
} | ||
payload := a.reportConnections[0] | ||
a.reportConnectionsMu.Unlock() | ||
|
||
logger := a.logger.With(slog.F("payload", payload)) | ||
logger.Debug(ctx, "reporting connection") | ||
_, err := aAPI.ReportConnection(ctx, payload) | ||
if err != nil { | ||
return xerrors.Errorf("failed to report connection: %w", err) | ||
} | ||
|
||
logger.Debug(ctx, "successfully reported connection") | ||
|
||
a.reportConnectionsMu.Lock() | ||
mafredri marked this conversation as resolved.
Show resolved
Hide resolved
|
||
a.reportConnections = a.reportConnections[1:] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why not just make This slice behavior is correct, just feels like a weaker implementation of a channel. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess it could be a channel, but how big to make it? How many in-flight reports is "too many"? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's fair, a channel would not be a terrible option here. It requires upfront allocation which can either be a good or a bad thing in memory constrained systems. For now I've limited this to 2048 reports pending, or about 300KB. We can revisit this later if needed. |
||
count := len(a.reportConnections) | ||
a.reportConnectionsMu.Unlock() | ||
|
||
if count == 0 { | ||
break | ||
} | ||
mafredri marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
} | ||
} | ||
|
||
func (a *agent) reportConnection(id uuid.UUID, connectionType proto.Connection_Type, ip string) (disconnected func(code int, reason string)) { | ||
// Remove the port from the IP. | ||
if portIndex := strings.LastIndex(ip, ":"); portIndex != -1 { | ||
ip = ip[:portIndex] | ||
ip = strings.Trim(ip, "[]") // IPv6 addresses are wrapped in brackets. | ||
} | ||
mafredri marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
a.reportConnectionsMu.Lock() | ||
defer a.reportConnectionsMu.Unlock() | ||
a.reportConnections = append(a.reportConnections, &proto.ReportConnectionRequest{ | ||
Connection: &proto.Connection{ | ||
Id: id[:], | ||
Action: proto.Connection_CONNECT, | ||
Type: connectionType, | ||
Timestamp: timestamppb.New(time.Now()), | ||
Ip: ip, | ||
StatusCode: 0, | ||
Reason: nil, | ||
}, | ||
}) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we have to worry about the size of this slice? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Typically no. The only way these accumulate is if we have lost connection to coders. I could add some safe-guards around this to drop messages, but since it's part of auditing that feels a bit wrong. WDYT? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'll defer to your judegment. If you feel it is relatively bounded, then it's good with me. Maybe leave a comment of the assumptions? I did not spend that long understanding the full context of the code. |
||
select { | ||
case a.reportConnectionsUpdate <- struct{}{}: | ||
default: | ||
} | ||
|
||
return func(code int, reason string) { | ||
a.reportConnectionsMu.Lock() | ||
defer a.reportConnectionsMu.Unlock() | ||
a.reportConnections = append(a.reportConnections, &proto.ReportConnectionRequest{ | ||
Connection: &proto.Connection{ | ||
Id: id[:], | ||
Action: proto.Connection_DISCONNECT, | ||
Type: connectionType, | ||
Timestamp: timestamppb.New(time.Now()), | ||
Ip: ip, | ||
StatusCode: int32(code), //nolint:gosec | ||
Reason: &reason, | ||
}, | ||
}) | ||
select { | ||
case a.reportConnectionsUpdate <- struct{}{}: | ||
default: | ||
} | ||
} | ||
} | ||
|
||
// fetchServiceBannerLoop fetches the service banner on an interval. It will | ||
// not be fetched immediately; the expectation is that it is primed elsewhere | ||
// (and must be done before the session actually starts). | ||
|
@@ -813,6 +925,10 @@ func (a *agent) run() (retErr error) { | |
return resourcesmonitor.Start(ctx) | ||
}) | ||
|
||
// Connection reports are part of auditing, we should keep sending them via | ||
// gracefulShutdownBehaviorRemain. | ||
connMan.startAgentAPI("report connections", gracefulShutdownBehaviorRemain, a.reportConnectionsLoop) | ||
|
||
// channels to sync goroutines below | ||
// handle manifest | ||
// | | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need a label here for clarity?
break
will always break the innermost loop but I always have trouble remembering that personally.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you review in VS Code, the syntax highlighting can be helpful here!

There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I only use labels when I need to. I don't think we have examples in our code for labels breaking the inner loop. Do we?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You wrote one a while back ;-) https://github.com/coder/coder/pull/14578/files#diff-f8b1ec0d615f1374c7f54e81c7871b337f92f8749e5608a155227c71160fafc8R48
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You got me 😄 ❤️