@@ -16,6 +16,7 @@ import (
16
16
"github.com/google/uuid"
17
17
"github.com/stretchr/testify/assert"
18
18
"github.com/stretchr/testify/require"
19
+ "golang.org/x/xerrors"
19
20
"tailscale.com/tailcfg"
20
21
21
22
"cdr.dev/slog"
@@ -25,7 +26,9 @@ import (
25
26
"github.com/coder/coder/v2/coderd/coderdtest"
26
27
"github.com/coder/coder/v2/coderd/database"
27
28
"github.com/coder/coder/v2/coderd/database/dbfake"
29
+ "github.com/coder/coder/v2/coderd/database/dbmem"
28
30
"github.com/coder/coder/v2/coderd/database/dbtime"
31
+ "github.com/coder/coder/v2/coderd/database/pubsub"
29
32
"github.com/coder/coder/v2/codersdk"
30
33
"github.com/coder/coder/v2/codersdk/agentsdk"
31
34
"github.com/coder/coder/v2/provisioner/echo"
@@ -1107,6 +1110,162 @@ func TestWorkspaceAgent_Metadata(t *testing.T) {
1107
1110
post ("unknown" , unknownKeyMetadata )
1108
1111
}
1109
1112
1113
+ type testWAMErrorStore struct {
1114
+ database.Store
1115
+ err atomic.Pointer [error ]
1116
+ }
1117
+
1118
+ func (s * testWAMErrorStore ) GetWorkspaceAgentMetadata (ctx context.Context , arg database.GetWorkspaceAgentMetadataParams ) ([]database.WorkspaceAgentMetadatum , error ) {
1119
+ err := s .err .Load ()
1120
+ if err != nil {
1121
+ return nil , * err
1122
+ }
1123
+ return s .Store .GetWorkspaceAgentMetadata (ctx , arg )
1124
+ }
1125
+
1126
+ func TestWorkspaceAgent_Metadata_CatchMemoryLeak (t * testing.T ) {
1127
+ t .Parallel ()
1128
+
1129
+ db := & testWAMErrorStore {Store : dbmem .New ()}
1130
+ psub := pubsub .NewInMemory ()
1131
+ logger := slogtest .Make (t , & slogtest.Options {IgnoreErrors : true }).Named ("coderd" ).Leveled (slog .LevelDebug )
1132
+ client := coderdtest .New (t , & coderdtest.Options {
1133
+ Database : db ,
1134
+ Pubsub : psub ,
1135
+ IncludeProvisionerDaemon : true ,
1136
+ Logger : & logger ,
1137
+ })
1138
+ user := coderdtest .CreateFirstUser (t , client )
1139
+ authToken := uuid .NewString ()
1140
+ ws := dbfake .Workspace (t , db , database.Workspace {
1141
+ OrganizationID : user .OrganizationID ,
1142
+ OwnerID : user .UserID ,
1143
+ })
1144
+ dbfake .WorkspaceBuild (t , db , ws , database.WorkspaceBuild {}, & proto.Resource {
1145
+ Name : "example" ,
1146
+ Type : "aws_instance" ,
1147
+ Agents : []* proto.Agent {{
1148
+ Metadata : []* proto.Agent_Metadata {
1149
+ {
1150
+ DisplayName : "First Meta" ,
1151
+ Key : "foo1" ,
1152
+ Script : "echo hi" ,
1153
+ Interval : 10 ,
1154
+ Timeout : 3 ,
1155
+ },
1156
+ {
1157
+ DisplayName : "Second Meta" ,
1158
+ Key : "foo2" ,
1159
+ Script : "echo bye" ,
1160
+ Interval : 10 ,
1161
+ Timeout : 3 ,
1162
+ },
1163
+ },
1164
+ Id : uuid .NewString (),
1165
+ Auth : & proto.Agent_Token {
1166
+ Token : authToken ,
1167
+ },
1168
+ }},
1169
+ })
1170
+ workspace , err := client .Workspace (context .Background (), ws .ID )
1171
+ require .NoError (t , err )
1172
+ for _ , res := range workspace .LatestBuild .Resources {
1173
+ for _ , a := range res .Agents {
1174
+ require .Equal (t , codersdk .WorkspaceAgentLifecycleCreated , a .LifecycleState )
1175
+ }
1176
+ }
1177
+
1178
+ agentClient := agentsdk .New (client .URL )
1179
+ agentClient .SetSessionToken (authToken )
1180
+
1181
+ ctx , cancel := context .WithCancel (testutil .Context (t , testutil .WaitSuperLong ))
1182
+
1183
+ manifest , err := agentClient .Manifest (ctx )
1184
+ require .NoError (t , err )
1185
+
1186
+ post := func (ctx context.Context , key , value string ) error {
1187
+ return agentClient .PostMetadata (ctx , agentsdk.PostMetadataRequest {
1188
+ Metadata : []agentsdk.Metadata {
1189
+ {
1190
+ Key : key ,
1191
+ WorkspaceAgentMetadataResult : codersdk.WorkspaceAgentMetadataResult {
1192
+ CollectedAt : time .Now (),
1193
+ Value : value ,
1194
+ },
1195
+ },
1196
+ },
1197
+ })
1198
+ }
1199
+
1200
+ workspace , err = client .Workspace (ctx , workspace .ID )
1201
+ require .NoError (t , err , "get workspace" )
1202
+
1203
+ // Start the SSE connection.
1204
+ metadata , errors := client .WatchWorkspaceAgentMetadata (ctx , manifest .AgentID )
1205
+
1206
+ // Discard the output, pretending to be a client consuming it.
1207
+ wantErr := xerrors .New ("test error" )
1208
+ metadataDone := testutil .Go (t , func () {
1209
+ for {
1210
+ select {
1211
+ case <- ctx .Done ():
1212
+ return
1213
+ case _ , ok := <- metadata :
1214
+ if ! ok {
1215
+ return
1216
+ }
1217
+ case err := <- errors :
1218
+ if err != nil && ! strings .Contains (err .Error (), wantErr .Error ()) {
1219
+ assert .NoError (t , err , "watch metadata" )
1220
+ }
1221
+ return
1222
+ }
1223
+ }
1224
+ })
1225
+
1226
+ postDone := testutil .Go (t , func () {
1227
+ for {
1228
+ // We need to send two separate metadata updates to trigger the
1229
+ // memory leak. foo2 will cause the number of foo1 to be doubled, etc.
1230
+ err = post (ctx , "foo1" , "hi" )
1231
+ if err != nil {
1232
+ if ! xerrors .Is (err , context .Canceled ) {
1233
+ assert .NoError (t , err , "post metadata foo1" )
1234
+ }
1235
+ return
1236
+ }
1237
+ err = post (ctx , "foo2" , "bye" )
1238
+ if err != nil {
1239
+ if ! xerrors .Is (err , context .Canceled ) {
1240
+ assert .NoError (t , err , "post metadata foo1" )
1241
+ }
1242
+ return
1243
+ }
1244
+ }
1245
+ })
1246
+
1247
+ // In a previously faulty implementation, this database error will trigger
1248
+ // a close of the goroutine that consumes metadata updates for refreshing
1249
+ // the metadata sent over SSE. As it was, the exit of the consumer was not
1250
+ // detected as a trigger to close down the connection.
1251
+ //
1252
+ // Further, there was a memory leak in the pubsub subscription that cause
1253
+ // ballooning of memory (almost double in size every received metadata).
1254
+ //
1255
+ // This db error should trigger a close of the SSE connection in the fixed
1256
+ // implementation. The memory leak should not happen in either case, but
1257
+ // testing it is not straightforward.
1258
+ db .err .Store (& wantErr )
1259
+
1260
+ select {
1261
+ case <- ctx .Done ():
1262
+ t .Fatal ("timeout waiting for SSE to close" )
1263
+ case <- metadataDone :
1264
+ }
1265
+ cancel ()
1266
+ <- postDone
1267
+ }
1268
+
1110
1269
func TestWorkspaceAgent_Startup (t * testing.T ) {
1111
1270
t .Parallel ()
1112
1271
0 commit comments