Skip to content

Commit 701c1b6

Browse files
committed
chore: send workspace pubsub events by owner id
1 parent b22bd81 commit 701c1b6

18 files changed

+355
-199
lines changed

coderd/agentapi/api.go

Lines changed: 12 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,9 @@ type API struct {
5252
var _ agentproto.DRPCAgentServer = &API{}
5353

5454
type Options struct {
55-
AgentID uuid.UUID
55+
AgentID uuid.UUID
56+
OwnerID uuid.UUID
57+
WorkspaceID uuid.UUID
5658

5759
Ctx context.Context
5860
Log slog.Logger
@@ -62,7 +64,7 @@ type Options struct {
6264
TailnetCoordinator *atomic.Pointer[tailnet.Coordinator]
6365
StatsReporter *workspacestats.Reporter
6466
AppearanceFetcher *atomic.Pointer[appearance.Fetcher]
65-
PublishWorkspaceUpdateFn func(ctx context.Context, workspaceID uuid.UUID)
67+
PublishWorkspaceUpdateFn func(ctx context.Context, userID uuid.UUID, event codersdk.WorkspaceEvent)
6668
PublishWorkspaceAgentLogsUpdateFn func(ctx context.Context, workspaceAgentID uuid.UUID, msg agentsdk.LogsNotifyMessage)
6769
NetworkTelemetryHandler func(batch []*tailnetproto.TelemetryEvent)
6870

@@ -75,10 +77,6 @@ type Options struct {
7577
ExternalAuthConfigs []*externalauth.Config
7678
Experiments codersdk.Experiments
7779

78-
// Optional:
79-
// WorkspaceID avoids a future lookup to find the workspace ID by setting
80-
// the cache in advance.
81-
WorkspaceID uuid.UUID
8280
UpdateAgentMetricsFn func(ctx context.Context, labels prometheusmetrics.AgentMetricLabels, metrics []*agentproto.Stats_Metric)
8381
}
8482

@@ -98,16 +96,7 @@ func New(opts Options) *API {
9896
AgentFn: api.agent,
9997
Database: opts.Database,
10098
DerpMapFn: opts.DerpMapFn,
101-
WorkspaceIDFn: func(ctx context.Context, wa *database.WorkspaceAgent) (uuid.UUID, error) {
102-
if opts.WorkspaceID != uuid.Nil {
103-
return opts.WorkspaceID, nil
104-
}
105-
ws, err := opts.Database.GetWorkspaceByAgentID(ctx, wa.ID)
106-
if err != nil {
107-
return uuid.Nil, err
108-
}
109-
return ws.Workspace.ID, nil
110-
},
99+
WorkspaceID: opts.WorkspaceID,
111100
}
112101

113102
api.AnnouncementBannerAPI = &AnnouncementBannerAPI{
@@ -125,7 +114,7 @@ func New(opts Options) *API {
125114

126115
api.LifecycleAPI = &LifecycleAPI{
127116
AgentFn: api.agent,
128-
WorkspaceIDFn: api.workspaceID,
117+
WorkspaceID: opts.WorkspaceID,
129118
Database: opts.Database,
130119
Log: opts.Log,
131120
PublishWorkspaceUpdateFn: api.publishWorkspaceUpdate,
@@ -242,6 +231,11 @@ func (a *API) publishWorkspaceUpdate(ctx context.Context, agent *database.Worksp
242231
return err
243232
}
244233

245-
a.opts.PublishWorkspaceUpdateFn(ctx, workspaceID)
234+
a.opts.PublishWorkspaceUpdateFn(ctx, a.opts.OwnerID, codersdk.WorkspaceEvent{
235+
Kind: codersdk.WorkspaceEventKindAgentUpdate,
236+
WorkspaceID: workspaceID,
237+
AgentID: &agent.ID,
238+
AgentName: &agent.Name,
239+
})
246240
return nil
247241
}

coderd/agentapi/lifecycle.go

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ func WithAPIVersion(ctx context.Context, version string) context.Context {
2525

2626
type LifecycleAPI struct {
2727
AgentFn func(context.Context) (database.WorkspaceAgent, error)
28-
WorkspaceIDFn func(context.Context, *database.WorkspaceAgent) (uuid.UUID, error)
28+
WorkspaceID uuid.UUID
2929
Database database.Store
3030
Log slog.Logger
3131
PublishWorkspaceUpdateFn func(context.Context, *database.WorkspaceAgent) error
@@ -45,13 +45,9 @@ func (a *LifecycleAPI) UpdateLifecycle(ctx context.Context, req *agentproto.Upda
4545
if err != nil {
4646
return nil, err
4747
}
48-
workspaceID, err := a.WorkspaceIDFn(ctx, &workspaceAgent)
49-
if err != nil {
50-
return nil, err
51-
}
5248

5349
logger := a.Log.With(
54-
slog.F("workspace_id", workspaceID),
50+
slog.F("workspace_id", a.WorkspaceID),
5551
slog.F("payload", req),
5652
)
5753
logger.Debug(ctx, "workspace agent state report")
@@ -140,15 +136,11 @@ func (a *LifecycleAPI) UpdateStartup(ctx context.Context, req *agentproto.Update
140136
if err != nil {
141137
return nil, err
142138
}
143-
workspaceID, err := a.WorkspaceIDFn(ctx, &workspaceAgent)
144-
if err != nil {
145-
return nil, err
146-
}
147139

148140
a.Log.Debug(
149141
ctx,
150142
"post workspace agent version",
151-
slog.F("workspace_id", workspaceID),
143+
slog.F("workspace_id", a.WorkspaceID),
152144
slog.F("agent_version", req.Startup.Version),
153145
)
154146

coderd/agentapi/lifecycle_test.go

Lines changed: 25 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -69,11 +69,9 @@ func TestUpdateLifecycle(t *testing.T) {
6969
AgentFn: func(ctx context.Context) (database.WorkspaceAgent, error) {
7070
return agentCreated, nil
7171
},
72-
WorkspaceIDFn: func(ctx context.Context, agent *database.WorkspaceAgent) (uuid.UUID, error) {
73-
return workspaceID, nil
74-
},
75-
Database: dbM,
76-
Log: slogtest.Make(t, nil),
72+
WorkspaceID: workspaceID,
73+
Database: dbM,
74+
Log: slogtest.Make(t, nil),
7775
PublishWorkspaceUpdateFn: func(ctx context.Context, agent *database.WorkspaceAgent) error {
7876
publishCalled = true
7977
return nil
@@ -111,11 +109,9 @@ func TestUpdateLifecycle(t *testing.T) {
111109
AgentFn: func(ctx context.Context) (database.WorkspaceAgent, error) {
112110
return agentStarting, nil
113111
},
114-
WorkspaceIDFn: func(ctx context.Context, agent *database.WorkspaceAgent) (uuid.UUID, error) {
115-
return workspaceID, nil
116-
},
117-
Database: dbM,
118-
Log: slogtest.Make(t, nil),
112+
WorkspaceID: workspaceID,
113+
Database: dbM,
114+
Log: slogtest.Make(t, nil),
119115
// Test that nil publish fn works.
120116
PublishWorkspaceUpdateFn: nil,
121117
}
@@ -156,11 +152,9 @@ func TestUpdateLifecycle(t *testing.T) {
156152
AgentFn: func(ctx context.Context) (database.WorkspaceAgent, error) {
157153
return agentCreated, nil
158154
},
159-
WorkspaceIDFn: func(ctx context.Context, agent *database.WorkspaceAgent) (uuid.UUID, error) {
160-
return workspaceID, nil
161-
},
162-
Database: dbM,
163-
Log: slogtest.Make(t, nil),
155+
WorkspaceID: workspaceID,
156+
Database: dbM,
157+
Log: slogtest.Make(t, nil),
164158
PublishWorkspaceUpdateFn: func(ctx context.Context, agent *database.WorkspaceAgent) error {
165159
publishCalled = true
166160
return nil
@@ -204,9 +198,7 @@ func TestUpdateLifecycle(t *testing.T) {
204198
AgentFn: func(ctx context.Context) (database.WorkspaceAgent, error) {
205199
return agentCreated, nil
206200
},
207-
WorkspaceIDFn: func(ctx context.Context, agent *database.WorkspaceAgent) (uuid.UUID, error) {
208-
return workspaceID, nil
209-
},
201+
WorkspaceID: workspaceID,
210202
Database: dbM,
211203
Log: slogtest.Make(t, nil),
212204
PublishWorkspaceUpdateFn: nil,
@@ -239,11 +231,9 @@ func TestUpdateLifecycle(t *testing.T) {
239231
AgentFn: func(ctx context.Context) (database.WorkspaceAgent, error) {
240232
return agent, nil
241233
},
242-
WorkspaceIDFn: func(ctx context.Context, agent *database.WorkspaceAgent) (uuid.UUID, error) {
243-
return workspaceID, nil
244-
},
245-
Database: dbM,
246-
Log: slogtest.Make(t, nil),
234+
WorkspaceID: workspaceID,
235+
Database: dbM,
236+
Log: slogtest.Make(t, nil),
247237
PublishWorkspaceUpdateFn: func(ctx context.Context, agent *database.WorkspaceAgent) error {
248238
atomic.AddInt64(&publishCalled, 1)
249239
return nil
@@ -314,11 +304,9 @@ func TestUpdateLifecycle(t *testing.T) {
314304
AgentFn: func(ctx context.Context) (database.WorkspaceAgent, error) {
315305
return agentCreated, nil
316306
},
317-
WorkspaceIDFn: func(ctx context.Context, agent *database.WorkspaceAgent) (uuid.UUID, error) {
318-
return workspaceID, nil
319-
},
320-
Database: dbM,
321-
Log: slogtest.Make(t, nil),
307+
WorkspaceID: workspaceID,
308+
Database: dbM,
309+
Log: slogtest.Make(t, nil),
322310
PublishWorkspaceUpdateFn: func(ctx context.Context, agent *database.WorkspaceAgent) error {
323311
publishCalled = true
324312
return nil
@@ -354,11 +342,9 @@ func TestUpdateStartup(t *testing.T) {
354342
AgentFn: func(ctx context.Context) (database.WorkspaceAgent, error) {
355343
return agent, nil
356344
},
357-
WorkspaceIDFn: func(ctx context.Context, agent *database.WorkspaceAgent) (uuid.UUID, error) {
358-
return workspaceID, nil
359-
},
360-
Database: dbM,
361-
Log: slogtest.Make(t, nil),
345+
WorkspaceID: workspaceID,
346+
Database: dbM,
347+
Log: slogtest.Make(t, nil),
362348
// Not used by UpdateStartup.
363349
PublishWorkspaceUpdateFn: nil,
364350
}
@@ -402,11 +388,9 @@ func TestUpdateStartup(t *testing.T) {
402388
AgentFn: func(ctx context.Context) (database.WorkspaceAgent, error) {
403389
return agent, nil
404390
},
405-
WorkspaceIDFn: func(ctx context.Context, agent *database.WorkspaceAgent) (uuid.UUID, error) {
406-
return workspaceID, nil
407-
},
408-
Database: dbM,
409-
Log: slogtest.Make(t, nil),
391+
WorkspaceID: workspaceID,
392+
Database: dbM,
393+
Log: slogtest.Make(t, nil),
410394
// Not used by UpdateStartup.
411395
PublishWorkspaceUpdateFn: nil,
412396
}
@@ -435,11 +419,9 @@ func TestUpdateStartup(t *testing.T) {
435419
AgentFn: func(ctx context.Context) (database.WorkspaceAgent, error) {
436420
return agent, nil
437421
},
438-
WorkspaceIDFn: func(ctx context.Context, agent *database.WorkspaceAgent) (uuid.UUID, error) {
439-
return workspaceID, nil
440-
},
441-
Database: dbM,
442-
Log: slogtest.Make(t, nil),
422+
WorkspaceID: workspaceID,
423+
Database: dbM,
424+
Log: slogtest.Make(t, nil),
443425
// Not used by UpdateStartup.
444426
PublishWorkspaceUpdateFn: nil,
445427
}

coderd/agentapi/manifest.go

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -29,23 +29,18 @@ type ManifestAPI struct {
2929
ExternalAuthConfigs []*externalauth.Config
3030
DisableDirectConnections bool
3131
DerpForceWebSockets bool
32+
WorkspaceID uuid.UUID
3233

33-
AgentFn func(context.Context) (database.WorkspaceAgent, error)
34-
WorkspaceIDFn func(context.Context, *database.WorkspaceAgent) (uuid.UUID, error)
35-
Database database.Store
36-
DerpMapFn func() *tailcfg.DERPMap
34+
AgentFn func(context.Context) (database.WorkspaceAgent, error)
35+
Database database.Store
36+
DerpMapFn func() *tailcfg.DERPMap
3737
}
3838

3939
func (a *ManifestAPI) GetManifest(ctx context.Context, _ *agentproto.GetManifestRequest) (*agentproto.Manifest, error) {
4040
workspaceAgent, err := a.AgentFn(ctx)
4141
if err != nil {
4242
return nil, err
4343
}
44-
workspaceID, err := a.WorkspaceIDFn(ctx, &workspaceAgent)
45-
if err != nil {
46-
return nil, err
47-
}
48-
4944
var (
5045
dbApps []database.WorkspaceApp
5146
scripts []database.WorkspaceAgentScript
@@ -75,7 +70,7 @@ func (a *ManifestAPI) GetManifest(ctx context.Context, _ *agentproto.GetManifest
7570
return err
7671
})
7772
eg.Go(func() (err error) {
78-
workspace, err = a.Database.GetWorkspaceByID(ctx, workspaceID)
73+
workspace, err = a.Database.GetWorkspaceByID(ctx, a.WorkspaceID)
7974
if err != nil {
8075
return xerrors.Errorf("getting workspace by id: %w", err)
8176
}

coderd/agentapi/manifest_test.go

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -288,11 +288,9 @@ func TestGetManifest(t *testing.T) {
288288
AgentFn: func(ctx context.Context) (database.WorkspaceAgent, error) {
289289
return agent, nil
290290
},
291-
WorkspaceIDFn: func(ctx context.Context, _ *database.WorkspaceAgent) (uuid.UUID, error) {
292-
return workspace.ID, nil
293-
},
294-
Database: mDB,
295-
DerpMapFn: derpMapFn,
291+
WorkspaceID: workspace.ID,
292+
Database: mDB,
293+
DerpMapFn: derpMapFn,
296294
}
297295

298296
mDB.EXPECT().GetWorkspaceAppsByAgentID(gomock.Any(), agent.ID).Return(apps, nil)
@@ -355,11 +353,9 @@ func TestGetManifest(t *testing.T) {
355353
AgentFn: func(ctx context.Context) (database.WorkspaceAgent, error) {
356354
return agent, nil
357355
},
358-
WorkspaceIDFn: func(ctx context.Context, _ *database.WorkspaceAgent) (uuid.UUID, error) {
359-
return workspace.ID, nil
360-
},
361-
Database: mDB,
362-
DerpMapFn: derpMapFn,
356+
WorkspaceID: workspace.ID,
357+
Database: mDB,
358+
DerpMapFn: derpMapFn,
363359
}
364360

365361
mDB.EXPECT().GetWorkspaceAppsByAgentID(gomock.Any(), agent.ID).Return(apps, nil)

coderd/agentapi/stats_test.go

Lines changed: 20 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -148,12 +148,15 @@ func TestUpdateStates(t *testing.T) {
148148
dbM.EXPECT().GetUserByID(gomock.Any(), user.ID).Return(user, nil)
149149

150150
// Ensure that pubsub notifications are sent.
151-
notifyDescription := make(chan []byte)
152-
ps.Subscribe(codersdk.WorkspaceNotifyChannel(workspace.ID), func(_ context.Context, description []byte) {
153-
go func() {
154-
notifyDescription <- description
155-
}()
156-
})
151+
notifyDescription := make(chan struct{})
152+
ps.Subscribe(codersdk.WorkspaceEventChannel(workspace.OwnerID),
153+
codersdk.HandleWorkspaceEvent(func(_ context.Context, e codersdk.WorkspaceEvent) {
154+
if e.Kind == codersdk.WorkspaceEventKindUpdatedStats && e.WorkspaceID == workspace.ID {
155+
go func() {
156+
notifyDescription <- struct{}{}
157+
}()
158+
}
159+
}))
157160

158161
resp, err := api.UpdateStats(context.Background(), req)
159162
require.NoError(t, err)
@@ -175,7 +178,7 @@ func TestUpdateStates(t *testing.T) {
175178
case <-ctx.Done():
176179
t.Error("timed out while waiting for pubsub notification")
177180
case description := <-notifyDescription:
178-
require.Equal(t, description, []byte{})
181+
require.Equal(t, description, struct{}{})
179182
}
180183
require.True(t, updateAgentMetricsFnCalled)
181184
})
@@ -482,12 +485,15 @@ func TestUpdateStates(t *testing.T) {
482485
dbM.EXPECT().GetUserByID(gomock.Any(), user.ID).Return(user, nil)
483486

484487
// Ensure that pubsub notifications are sent.
485-
notifyDescription := make(chan []byte)
486-
ps.Subscribe(codersdk.WorkspaceNotifyChannel(workspace.ID), func(_ context.Context, description []byte) {
487-
go func() {
488-
notifyDescription <- description
489-
}()
490-
})
488+
notifyDescription := make(chan struct{})
489+
ps.Subscribe(codersdk.WorkspaceEventChannel(workspace.OwnerID),
490+
codersdk.HandleWorkspaceEvent(func(_ context.Context, e codersdk.WorkspaceEvent) {
491+
if e.Kind == codersdk.WorkspaceEventKindUpdatedStats && e.WorkspaceID == workspace.ID {
492+
go func() {
493+
notifyDescription <- struct{}{}
494+
}()
495+
}
496+
}))
491497

492498
resp, err := api.UpdateStats(context.Background(), req)
493499
require.NoError(t, err)
@@ -507,7 +513,7 @@ func TestUpdateStates(t *testing.T) {
507513
case <-ctx.Done():
508514
t.Error("timed out while waiting for pubsub notification")
509515
case description := <-notifyDescription:
510-
require.Equal(t, description, []byte{})
516+
require.Equal(t, description, struct{}{})
511517
}
512518
require.True(t, updateAgentMetricsFnCalled)
513519
})

coderd/database/dbfake/dbfake.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"github.com/coder/coder/v2/coderd/provisionerdserver"
2020
"github.com/coder/coder/v2/coderd/rbac"
2121
"github.com/coder/coder/v2/coderd/telemetry"
22+
"github.com/coder/coder/v2/coderd/util/ptr"
2223
"github.com/coder/coder/v2/codersdk"
2324
"github.com/coder/coder/v2/provisionersdk"
2425
sdkproto "github.com/coder/coder/v2/provisionersdk/proto"
@@ -225,7 +226,14 @@ func (b WorkspaceBuildBuilder) Do() WorkspaceResponse {
225226
_ = dbgen.WorkspaceBuildParameters(b.t, b.db, b.params)
226227

227228
if b.ps != nil {
228-
err = b.ps.Publish(codersdk.WorkspaceNotifyChannel(resp.Build.WorkspaceID), []byte{})
229+
msg, err := json.Marshal(codersdk.WorkspaceEvent{
230+
Kind: codersdk.WorkspaceEventKindStateChange,
231+
WorkspaceID: resp.Workspace.ID,
232+
Transition: ptr.Ref(codersdk.WorkspaceTransition(resp.Build.Transition)),
233+
JobStatus: ptr.Ref(codersdk.ProvisionerJobStatus(job.JobStatus)),
234+
})
235+
require.NoError(b.t, err)
236+
err = b.ps.Publish(codersdk.WorkspaceEventChannel(resp.Build.InitiatorID), msg)
229237
require.NoError(b.t, err)
230238
}
231239

0 commit comments

Comments
 (0)