Skip to content

Commit ab3cea2

Browse files
committed
add test that can reproduce memory leak
1 parent 65a9899 commit ab3cea2

File tree

2 files changed

+204
-0
lines changed

2 files changed

+204
-0
lines changed

coderd/workspaceagents_test.go

Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"github.com/google/uuid"
1717
"github.com/stretchr/testify/assert"
1818
"github.com/stretchr/testify/require"
19+
"golang.org/x/xerrors"
1920
"tailscale.com/tailcfg"
2021

2122
"cdr.dev/slog"
@@ -25,7 +26,9 @@ import (
2526
"github.com/coder/coder/v2/coderd/coderdtest"
2627
"github.com/coder/coder/v2/coderd/database"
2728
"github.com/coder/coder/v2/coderd/database/dbfake"
29+
"github.com/coder/coder/v2/coderd/database/dbmem"
2830
"github.com/coder/coder/v2/coderd/database/dbtime"
31+
"github.com/coder/coder/v2/coderd/database/pubsub"
2932
"github.com/coder/coder/v2/codersdk"
3033
"github.com/coder/coder/v2/codersdk/agentsdk"
3134
"github.com/coder/coder/v2/provisioner/echo"
@@ -1107,6 +1110,162 @@ func TestWorkspaceAgent_Metadata(t *testing.T) {
11071110
post("unknown", unknownKeyMetadata)
11081111
}
11091112

1113+
type testWAMErrorStore struct {
1114+
database.Store
1115+
err atomic.Pointer[error]
1116+
}
1117+
1118+
func (s *testWAMErrorStore) GetWorkspaceAgentMetadata(ctx context.Context, arg database.GetWorkspaceAgentMetadataParams) ([]database.WorkspaceAgentMetadatum, error) {
1119+
err := s.err.Load()
1120+
if err != nil {
1121+
return nil, *err
1122+
}
1123+
return s.Store.GetWorkspaceAgentMetadata(ctx, arg)
1124+
}
1125+
1126+
func TestWorkspaceAgent_Metadata_CatchMemoryLeak(t *testing.T) {
1127+
t.Parallel()
1128+
1129+
db := &testWAMErrorStore{Store: dbmem.New()}
1130+
psub := pubsub.NewInMemory()
1131+
logger := slogtest.Make(t, &slogtest.Options{IgnoreErrors: true}).Named("coderd").Leveled(slog.LevelDebug)
1132+
client := coderdtest.New(t, &coderdtest.Options{
1133+
Database: db,
1134+
Pubsub: psub,
1135+
IncludeProvisionerDaemon: true,
1136+
Logger: &logger,
1137+
})
1138+
user := coderdtest.CreateFirstUser(t, client)
1139+
authToken := uuid.NewString()
1140+
ws := dbfake.Workspace(t, db, database.Workspace{
1141+
OrganizationID: user.OrganizationID,
1142+
OwnerID: user.UserID,
1143+
})
1144+
dbfake.WorkspaceBuild(t, db, ws, database.WorkspaceBuild{}, &proto.Resource{
1145+
Name: "example",
1146+
Type: "aws_instance",
1147+
Agents: []*proto.Agent{{
1148+
Metadata: []*proto.Agent_Metadata{
1149+
{
1150+
DisplayName: "First Meta",
1151+
Key: "foo1",
1152+
Script: "echo hi",
1153+
Interval: 10,
1154+
Timeout: 3,
1155+
},
1156+
{
1157+
DisplayName: "Second Meta",
1158+
Key: "foo2",
1159+
Script: "echo bye",
1160+
Interval: 10,
1161+
Timeout: 3,
1162+
},
1163+
},
1164+
Id: uuid.NewString(),
1165+
Auth: &proto.Agent_Token{
1166+
Token: authToken,
1167+
},
1168+
}},
1169+
})
1170+
workspace, err := client.Workspace(context.Background(), ws.ID)
1171+
require.NoError(t, err)
1172+
for _, res := range workspace.LatestBuild.Resources {
1173+
for _, a := range res.Agents {
1174+
require.Equal(t, codersdk.WorkspaceAgentLifecycleCreated, a.LifecycleState)
1175+
}
1176+
}
1177+
1178+
agentClient := agentsdk.New(client.URL)
1179+
agentClient.SetSessionToken(authToken)
1180+
1181+
ctx, cancel := context.WithCancel(testutil.Context(t, testutil.WaitSuperLong))
1182+
1183+
manifest, err := agentClient.Manifest(ctx)
1184+
require.NoError(t, err)
1185+
1186+
post := func(ctx context.Context, key, value string) error {
1187+
return agentClient.PostMetadata(ctx, agentsdk.PostMetadataRequest{
1188+
Metadata: []agentsdk.Metadata{
1189+
{
1190+
Key: key,
1191+
WorkspaceAgentMetadataResult: codersdk.WorkspaceAgentMetadataResult{
1192+
CollectedAt: time.Now(),
1193+
Value: value,
1194+
},
1195+
},
1196+
},
1197+
})
1198+
}
1199+
1200+
workspace, err = client.Workspace(ctx, workspace.ID)
1201+
require.NoError(t, err, "get workspace")
1202+
1203+
// Start the SSE connection.
1204+
metadata, errors := client.WatchWorkspaceAgentMetadata(ctx, manifest.AgentID)
1205+
1206+
// Discard the output, pretending to be a client consuming it.
1207+
wantErr := xerrors.New("test error")
1208+
metadataDone := testutil.Go(t, func() {
1209+
for {
1210+
select {
1211+
case <-ctx.Done():
1212+
return
1213+
case _, ok := <-metadata:
1214+
if !ok {
1215+
return
1216+
}
1217+
case err := <-errors:
1218+
if err != nil && !strings.Contains(err.Error(), wantErr.Error()) {
1219+
assert.NoError(t, err, "watch metadata")
1220+
}
1221+
return
1222+
}
1223+
}
1224+
})
1225+
1226+
postDone := testutil.Go(t, func() {
1227+
for {
1228+
// We need to send two separate metadata updates to trigger the
1229+
// memory leak. foo2 will cause the number of foo1 to be doubled, etc.
1230+
err = post(ctx, "foo1", "hi")
1231+
if err != nil {
1232+
if !xerrors.Is(err, context.Canceled) {
1233+
assert.NoError(t, err, "post metadata foo1")
1234+
}
1235+
return
1236+
}
1237+
err = post(ctx, "foo2", "bye")
1238+
if err != nil {
1239+
if !xerrors.Is(err, context.Canceled) {
1240+
assert.NoError(t, err, "post metadata foo1")
1241+
}
1242+
return
1243+
}
1244+
}
1245+
})
1246+
1247+
// In a previously faulty implementation, this database error will trigger
1248+
// a close of the goroutine that consumes metadata updates for refreshing
1249+
// the metadata sent over SSE. As it was, the exit of the consumer was not
1250+
// detected as a trigger to close down the connection.
1251+
//
1252+
// Further, there was a memory leak in the pubsub subscription that cause
1253+
// ballooning of memory (almost double in size every received metadata).
1254+
//
1255+
// This db error should trigger a close of the SSE connection in the fixed
1256+
// implementation. The memory leak should not happen in either case, but
1257+
// testing it is not straightforward.
1258+
db.err.Store(&wantErr)
1259+
1260+
select {
1261+
case <-ctx.Done():
1262+
t.Fatal("timeout waiting for SSE to close")
1263+
case <-metadataDone:
1264+
}
1265+
cancel()
1266+
<-postDone
1267+
}
1268+
11101269
func TestWorkspaceAgent_Startup(t *testing.T) {
11111270
t.Parallel()
11121271

testutil/go.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package testutil
2+
3+
import (
4+
"context"
5+
"testing"
6+
)
7+
8+
// Go runs fn in a goroutine and waits until fn has completed before
9+
// test completion. Done is returned for optionally waiting for fn to
10+
// exit.
11+
func Go(t *testing.T, fn func()) (done <-chan struct{}) {
12+
t.Helper()
13+
14+
doneC := make(chan struct{})
15+
t.Cleanup(func() {
16+
<-doneC
17+
})
18+
go func() {
19+
fn()
20+
close(doneC)
21+
}()
22+
23+
return doneC
24+
}
25+
26+
// GoContext runs fn in a goroutine passing a context that will be
27+
// canceled on test completion and wait until fn has finished executing.
28+
// Done and cancel are returned for optionally waiting until completion
29+
// or early cancellation.
30+
func GoContext(t *testing.T, fn func(context.Context)) (done <-chan struct{}, cancel context.CancelFunc) {
31+
t.Helper()
32+
33+
ctx, cancel := context.WithCancel(context.Background())
34+
doneC := make(chan struct{})
35+
t.Cleanup(func() {
36+
cancel()
37+
<-done
38+
})
39+
go func() {
40+
fn(ctx)
41+
close(doneC)
42+
}()
43+
44+
return doneC, cancel
45+
}

0 commit comments

Comments
 (0)