Skip to content

feat: add connection statistics for workspace agents #6469

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 29 commits into from
Mar 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
8f1f141
fix: don't make session counts cumulative
kylecarbs Mar 2, 2023
ddf9841
Add databasefake query for getting agent stats
kylecarbs Mar 2, 2023
28d6db5
Add deployment stats endpoint
kylecarbs Mar 2, 2023
29719a4
The query... works?!?
kylecarbs Mar 2, 2023
09a2dad
Fix aggregation query
kylecarbs Mar 6, 2023
12a52b1
Select from multiple tables instead
kylecarbs Mar 6, 2023
a1804a9
Fix continuous stats
kylecarbs Mar 6, 2023
93f013b
Increase period of stat refreshes
kylecarbs Mar 6, 2023
50260c3
Add workspace counts to deployment stats
kylecarbs Mar 7, 2023
d1bae99
fmt
kylecarbs Mar 7, 2023
9fe9d4c
Add a slight bit of responsiveness
kylecarbs Mar 7, 2023
00ebe2e
Fix template version editor overflow
kylecarbs Mar 7, 2023
cd76533
Add refresh button
kylecarbs Mar 7, 2023
506740b
Fix font family on button
kylecarbs Mar 7, 2023
1924f58
Merge branch 'main' into exportstats
kylecarbs Mar 7, 2023
9f00ac5
Fix latest stat being reported
kylecarbs Mar 7, 2023
4b6992c
Merge branch 'main' into exportstats
kylecarbs Mar 7, 2023
1af9f64
Revert agent conn stats
kylecarbs Mar 7, 2023
e3ca39f
Merge branch 'main' into exportstats
kylecarbs Mar 7, 2023
8ad39d6
Fix linting error
kylecarbs Mar 7, 2023
0f06b23
Fix tests
kylecarbs Mar 7, 2023
e87ba59
Fix gen
kylecarbs Mar 7, 2023
99d7d1a
Fix migrations
kylecarbs Mar 7, 2023
37ad03f
Block on sending stat updates
kylecarbs Mar 7, 2023
415d8b1
Merge branch 'main' into exportstats
kylecarbs Mar 7, 2023
0037a64
Add test fixtures
kylecarbs Mar 8, 2023
3d70b2a
Merge branch 'main' into exportstats
kylecarbs Mar 9, 2023
d708210
Fix response structure
kylecarbs Mar 9, 2023
c951d5a
make gen
kylecarbs Mar 9, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 21 additions & 18 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"os/exec"
"os/user"
"path/filepath"
"reflect"
"runtime"
"sort"
"strconv"
Expand Down Expand Up @@ -60,7 +61,7 @@ const (

// MagicSSHSessionTypeEnvironmentVariable is used to track the purpose behind an SSH connection.
// This is stripped from any commands being executed, and is counted towards connection stats.
MagicSSHSessionTypeEnvironmentVariable = "__CODER_SSH_SESSION_TYPE"
MagicSSHSessionTypeEnvironmentVariable = "CODER_SSH_SESSION_TYPE"
// MagicSSHSessionTypeVSCode is set in the SSH config by the VS Code extension to identify itself.
MagicSSHSessionTypeVSCode = "vscode"
// MagicSSHSessionTypeJetBrains is set in the SSH config by the JetBrains extension to identify itself.
Expand Down Expand Up @@ -122,9 +123,7 @@ func New(options Options) io.Closer {
tempDir: options.TempDir,
lifecycleUpdate: make(chan struct{}, 1),
lifecycleReported: make(chan codersdk.WorkspaceAgentLifecycle, 1),
// TODO: This is a temporary hack to make tests not flake.
// @kylecarbs has a better solution in here: https://github.com/coder/coder/pull/6469
connStatsChan: make(chan *agentsdk.Stats, 8),
connStatsChan: make(chan *agentsdk.Stats, 1),
}
a.init(ctx)
return a
Expand Down Expand Up @@ -159,11 +158,8 @@ type agent struct {

network *tailnet.Conn
connStatsChan chan *agentsdk.Stats
latestStat atomic.Pointer[agentsdk.Stats]

statRxPackets atomic.Int64
statRxBytes atomic.Int64
statTxPackets atomic.Int64
statTxBytes atomic.Int64
connCountVSCode atomic.Int64
connCountJetBrains atomic.Int64
connCountReconnectingPTY atomic.Int64
Expand Down Expand Up @@ -905,10 +901,13 @@ func (a *agent) handleSSHSession(session ssh.Session) (retErr error) {
switch magicType {
case MagicSSHSessionTypeVSCode:
a.connCountVSCode.Add(1)
defer a.connCountVSCode.Add(-1)
case MagicSSHSessionTypeJetBrains:
a.connCountJetBrains.Add(1)
defer a.connCountJetBrains.Add(-1)
case "":
a.connCountSSHSession.Add(1)
defer a.connCountSSHSession.Add(-1)
default:
a.logger.Warn(ctx, "invalid magic ssh session type specified", slog.F("type", magicType))
}
Expand Down Expand Up @@ -1012,6 +1011,7 @@ func (a *agent) handleReconnectingPTY(ctx context.Context, logger slog.Logger, m
defer conn.Close()

a.connCountReconnectingPTY.Add(1)
defer a.connCountReconnectingPTY.Add(-1)

connectionID := uuid.NewString()
logger = logger.With(slog.F("id", msg.ID), slog.F("connection_id", connectionID))
Expand Down Expand Up @@ -1210,18 +1210,15 @@ func (a *agent) startReportingConnectionStats(ctx context.Context) {
ConnectionCount: int64(len(networkStats)),
ConnectionsByProto: map[string]int64{},
}
// Tailscale resets counts on every report!
// We'd rather have these compound, like Linux does!
for conn, counts := range networkStats {
stats.ConnectionsByProto[conn.Proto.String()]++
stats.RxBytes = a.statRxBytes.Add(int64(counts.RxBytes))
stats.RxPackets = a.statRxPackets.Add(int64(counts.RxPackets))
stats.TxBytes = a.statTxBytes.Add(int64(counts.TxBytes))
stats.TxPackets = a.statTxPackets.Add(int64(counts.TxPackets))
stats.RxBytes += int64(counts.RxBytes)
stats.RxPackets += int64(counts.RxPackets)
stats.TxBytes += int64(counts.TxBytes)
stats.TxPackets += int64(counts.TxPackets)
}

// Tailscale's connection stats are not cumulative, but it makes no sense to make
// ours temporary.
// The count of active sessions.
stats.SessionCountSSH = a.connCountSSHSession.Load()
stats.SessionCountVSCode = a.connCountVSCode.Load()
stats.SessionCountJetBrains = a.connCountJetBrains.Load()
Expand Down Expand Up @@ -1270,10 +1267,16 @@ func (a *agent) startReportingConnectionStats(ctx context.Context) {
// Convert from microseconds to milliseconds.
stats.ConnectionMedianLatencyMS /= 1000

lastStat := a.latestStat.Load()
if lastStat != nil && reflect.DeepEqual(lastStat, stats) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this still confuses me a bit. If Tailscale stats aren't cumulative, isn't the only way this matches lastStat if there was no chatter (tx/rx), the latency and sessions for SSH/Code/JetBrains stayed the same?

Since we're also doing network pings in the latency check, I think there is a non-zero chance for multiple reportStats to be running concurrently, essentially competing about Load()/Store() here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, good points. I'll refactor this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After looking at this again, it seems like this should be fine.

This will only match if there's no traffic, but that's arguably great because then we aren't spamming the database with nonsense. I don't want to do this in coderd, because we'd need to query for the last stat to ensure it's not the same.

reportStats is blocking, and so subsequent agent stat refreshes will wait before running again, so I don't think they'd compete.

Let me know if I'm overlooking something or didn't understand properly, I'm sick and my brain is stuffy right now ;p

a.logger.Info(ctx, "skipping stat because nothing changed")
return
}
a.latestStat.Store(stats)

select {
case a.connStatsChan <- stats:
default:
a.logger.Warn(ctx, "network stat dropped")
case <-a.closed:
}
}

Expand Down
106 changes: 69 additions & 37 deletions agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,10 @@ func TestAgent_Stats_SSH(t *testing.T) {
session, err := sshClient.NewSession()
require.NoError(t, err)
defer session.Close()
require.NoError(t, session.Run("echo test"))
stdin, err := session.StdinPipe()
require.NoError(t, err)
err = session.Shell()
require.NoError(t, err)

var s *agentsdk.Stats
require.Eventuallyf(t, func() bool {
Expand All @@ -78,6 +81,9 @@ func TestAgent_Stats_SSH(t *testing.T) {
}, testutil.WaitLong, testutil.IntervalFast,
"never saw stats: %+v", s,
)
_ = stdin.Close()
err = session.Wait()
require.NoError(t, err)
}

func TestAgent_Stats_ReconnectingPTY(t *testing.T) {
Expand Down Expand Up @@ -112,43 +118,69 @@ func TestAgent_Stats_ReconnectingPTY(t *testing.T) {

func TestAgent_Stats_Magic(t *testing.T) {
t.Parallel()
t.Run("StripsEnvironmentVariable", func(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong)
defer cancel()
//nolint:dogsled
conn, _, _, _, _ := setupAgent(t, agentsdk.Metadata{}, 0)
sshClient, err := conn.SSHClient(ctx)
require.NoError(t, err)
defer sshClient.Close()
session, err := sshClient.NewSession()
require.NoError(t, err)
session.Setenv(agent.MagicSSHSessionTypeEnvironmentVariable, agent.MagicSSHSessionTypeVSCode)
defer session.Close()

ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong)
defer cancel()

//nolint:dogsled
conn, _, stats, _, _ := setupAgent(t, agentsdk.Metadata{}, 0)
sshClient, err := conn.SSHClient(ctx)
require.NoError(t, err)
defer sshClient.Close()
session, err := sshClient.NewSession()
require.NoError(t, err)
session.Setenv(agent.MagicSSHSessionTypeEnvironmentVariable, agent.MagicSSHSessionTypeVSCode)
defer session.Close()

command := "sh -c 'echo $" + agent.MagicSSHSessionTypeEnvironmentVariable + "'"
expected := ""
if runtime.GOOS == "windows" {
expected = "%" + agent.MagicSSHSessionTypeEnvironmentVariable + "%"
command = "cmd.exe /c echo " + expected
}
output, err := session.Output(command)
require.NoError(t, err)
require.Equal(t, expected, strings.TrimSpace(string(output)))
var s *agentsdk.Stats
require.Eventuallyf(t, func() bool {
var ok bool
s, ok = <-stats
return ok && s.ConnectionCount > 0 && s.RxBytes > 0 && s.TxBytes > 0 &&
// Ensure that the connection didn't count as a "normal" SSH session.
// This was a special one, so it should be labeled specially in the stats!
s.SessionCountVSCode == 1 &&
// Ensure that connection latency is being counted!
// If it isn't, it's set to -1.
s.ConnectionMedianLatencyMS >= 0
}, testutil.WaitLong, testutil.IntervalFast,
"never saw stats: %+v", s,
)
command := "sh -c 'echo $" + agent.MagicSSHSessionTypeEnvironmentVariable + "'"
expected := ""
if runtime.GOOS == "windows" {
expected = "%" + agent.MagicSSHSessionTypeEnvironmentVariable + "%"
command = "cmd.exe /c echo " + expected
}
output, err := session.Output(command)
require.NoError(t, err)
require.Equal(t, expected, strings.TrimSpace(string(output)))
})
t.Run("Tracks", func(t *testing.T) {
t.Parallel()
if runtime.GOOS == "window" {
t.Skip("Sleeping for infinity doesn't work on Windows")
}
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong)
defer cancel()
//nolint:dogsled
conn, _, stats, _, _ := setupAgent(t, agentsdk.Metadata{}, 0)
sshClient, err := conn.SSHClient(ctx)
require.NoError(t, err)
defer sshClient.Close()
session, err := sshClient.NewSession()
require.NoError(t, err)
session.Setenv(agent.MagicSSHSessionTypeEnvironmentVariable, agent.MagicSSHSessionTypeVSCode)
defer session.Close()
stdin, err := session.StdinPipe()
require.NoError(t, err)
err = session.Shell()
require.NoError(t, err)
var s *agentsdk.Stats
require.Eventuallyf(t, func() bool {
var ok bool
s, ok = <-stats
return ok && s.ConnectionCount > 0 && s.RxBytes > 0 && s.TxBytes > 0 &&
// Ensure that the connection didn't count as a "normal" SSH session.
// This was a special one, so it should be labeled specially in the stats!
s.SessionCountVSCode == 1 &&
// Ensure that connection latency is being counted!
// If it isn't, it's set to -1.
s.ConnectionMedianLatencyMS >= 0
}, testutil.WaitLong, testutil.IntervalFast,
"never saw stats: %+v", s,
)
// The shell will automatically exit if there is no stdin!
_ = stdin.Close()
err = session.Wait()
require.NoError(t, err)
})
}

func TestAgent_SessionExec(t *testing.T) {
Expand Down
Loading