Skip to content

Commit f6d811f

Browse files
committed
use subscribewitherr
1 parent d41e0b0 commit f6d811f

File tree

5 files changed

+55
-39
lines changed

5 files changed

+55
-39
lines changed

coderd/agentapi/stats_test.go

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,6 @@ import (
1313
"go.uber.org/mock/gomock"
1414
"google.golang.org/protobuf/types/known/durationpb"
1515

16-
"cdr.dev/slog/sloggers/slogtest"
17-
1816
agentproto "github.com/coder/coder/v2/agent/proto"
1917
"github.com/coder/coder/v2/coderd/agentapi"
2018
"github.com/coder/coder/v2/coderd/database"
@@ -150,10 +148,12 @@ func TestUpdateStates(t *testing.T) {
150148

151149
// Ensure that pubsub notifications are sent.
152150
notifyDescription := make(chan struct{})
153-
ps.Subscribe(wspubsub.WorkspaceEventChannel(workspace.OwnerID),
151+
ps.SubscribeWithErr(wspubsub.WorkspaceEventChannel(workspace.OwnerID),
154152
wspubsub.HandleWorkspaceEvent(
155-
slogtest.Make(t, nil),
156-
func(_ context.Context, e wspubsub.WorkspaceEvent) {
153+
func(_ context.Context, e wspubsub.WorkspaceEvent, err error) {
154+
if err != nil {
155+
return
156+
}
157157
if e.Kind == wspubsub.WorkspaceEventKindStatsUpdate && e.WorkspaceID == workspace.ID {
158158
go func() {
159159
notifyDescription <- struct{}{}
@@ -479,10 +479,12 @@ func TestUpdateStates(t *testing.T) {
479479

480480
// Ensure that pubsub notifications are sent.
481481
notifyDescription := make(chan struct{})
482-
ps.Subscribe(wspubsub.WorkspaceEventChannel(workspace.OwnerID),
482+
ps.SubscribeWithErr(wspubsub.WorkspaceEventChannel(workspace.OwnerID),
483483
wspubsub.HandleWorkspaceEvent(
484-
slogtest.Make(t, nil),
485-
func(_ context.Context, e wspubsub.WorkspaceEvent) {
484+
func(_ context.Context, e wspubsub.WorkspaceEvent, err error) {
485+
if err != nil {
486+
return
487+
}
486488
if e.Kind == wspubsub.WorkspaceEventKindStatsUpdate && e.WorkspaceID == workspace.ID {
487489
go func() {
488490
notifyDescription <- struct{}{}

coderd/provisionerdserver/provisionerdserver_test.go

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -296,10 +296,12 @@ func TestAcquireJob(t *testing.T) {
296296

297297
startPublished := make(chan struct{})
298298
var closed bool
299-
closeStartSubscribe, err := ps.Subscribe(wspubsub.WorkspaceEventChannel(workspace.OwnerID),
299+
closeStartSubscribe, err := ps.SubscribeWithErr(wspubsub.WorkspaceEventChannel(workspace.OwnerID),
300300
wspubsub.HandleWorkspaceEvent(
301-
slogtest.Make(t, nil),
302-
func(_ context.Context, e wspubsub.WorkspaceEvent) {
301+
func(_ context.Context, e wspubsub.WorkspaceEvent, err error) {
302+
if err != nil {
303+
return
304+
}
303305
if e.Kind == wspubsub.WorkspaceEventKindStateChange && e.WorkspaceID == workspace.ID {
304306
if !closed {
305307
close(startPublished)
@@ -404,10 +406,12 @@ func TestAcquireJob(t *testing.T) {
404406
})
405407

406408
stopPublished := make(chan struct{})
407-
closeStopSubscribe, err := ps.Subscribe(wspubsub.WorkspaceEventChannel(workspace.OwnerID),
409+
closeStopSubscribe, err := ps.SubscribeWithErr(wspubsub.WorkspaceEventChannel(workspace.OwnerID),
408410
wspubsub.HandleWorkspaceEvent(
409-
slogtest.Make(t, nil),
410-
func(_ context.Context, e wspubsub.WorkspaceEvent) {
411+
func(_ context.Context, e wspubsub.WorkspaceEvent, err error) {
412+
if err != nil {
413+
return
414+
}
411415
if e.Kind == wspubsub.WorkspaceEventKindStateChange && e.WorkspaceID == workspace.ID {
412416
close(stopPublished)
413417
}
@@ -885,7 +889,7 @@ func TestFailJob(t *testing.T) {
885889
auditor: auditor,
886890
})
887891
org := dbgen.Organization(t, db, database.Organization{})
888-
workspace := dbgen.Workspace(t, db, database.Workspace{
892+
workspace := dbgen.Workspace(t, db, database.WorkspaceTable{
889893
ID: uuid.New(),
890894
AutomaticUpdates: database.AutomaticUpdatesNever,
891895
OrganizationID: org.ID,
@@ -925,10 +929,12 @@ func TestFailJob(t *testing.T) {
925929
require.NoError(t, err)
926930

927931
publishedWorkspace := make(chan struct{})
928-
closeWorkspaceSubscribe, err := ps.Subscribe(wspubsub.WorkspaceEventChannel(workspace.OwnerID),
932+
closeWorkspaceSubscribe, err := ps.SubscribeWithErr(wspubsub.WorkspaceEventChannel(workspace.OwnerID),
929933
wspubsub.HandleWorkspaceEvent(
930-
slogtest.Make(t, nil),
931-
func(_ context.Context, e wspubsub.WorkspaceEvent) {
934+
func(_ context.Context, e wspubsub.WorkspaceEvent, err error) {
935+
if err != nil {
936+
return
937+
}
932938
if e.Kind == wspubsub.WorkspaceEventKindStateChange && e.WorkspaceID == workspace.ID {
933939
close(publishedWorkspace)
934940
}
@@ -1321,11 +1327,13 @@ func TestCompleteJob(t *testing.T) {
13211327
require.NoError(t, err)
13221328

13231329
publishedWorkspace := make(chan struct{})
1324-
closeWorkspaceSubscribe, err := ps.Subscribe(wspubsub.WorkspaceEventChannel(workspace.OwnerID),
1330+
closeWorkspaceSubscribe, err := ps.SubscribeWithErr(wspubsub.WorkspaceEventChannel(workspaceTable.OwnerID),
13251331
wspubsub.HandleWorkspaceEvent(
1326-
slogtest.Make(t, nil),
1327-
func(_ context.Context, e wspubsub.WorkspaceEvent) {
1328-
if e.Kind == wspubsub.WorkspaceEventKindStateChange && e.WorkspaceID == workspace.ID {
1332+
func(_ context.Context, e wspubsub.WorkspaceEvent, err error) {
1333+
if err != nil {
1334+
return
1335+
}
1336+
if e.Kind == wspubsub.WorkspaceEventKindStateChange && e.WorkspaceID == workspaceTable.ID {
13291337
close(publishedWorkspace)
13301338
}
13311339
}))

coderd/workspaceagents.go

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -251,9 +251,9 @@ func (api *API) patchWorkspaceAgentLogs(rw http.ResponseWriter, r *http.Request)
251251
return
252252
}
253253

254-
api.publishWorkspaceUpdate(ctx, workspace.Workspace.OwnerID, wspubsub.WorkspaceEvent{
254+
api.publishWorkspaceUpdate(ctx, workspace.OwnerID, wspubsub.WorkspaceEvent{
255255
Kind: wspubsub.WorkspaceEventKindAgentLogsOverflow,
256-
WorkspaceID: workspace.Workspace.ID,
256+
WorkspaceID: workspace.ID,
257257
AgentID: &workspaceAgent.ID,
258258
})
259259

@@ -283,9 +283,9 @@ func (api *API) patchWorkspaceAgentLogs(rw http.ResponseWriter, r *http.Request)
283283
return
284284
}
285285

286-
api.publishWorkspaceUpdate(ctx, workspace.Workspace.OwnerID, wspubsub.WorkspaceEvent{
286+
api.publishWorkspaceUpdate(ctx, workspace.OwnerID, wspubsub.WorkspaceEvent{
287287
Kind: wspubsub.WorkspaceEventKindAgentFirstLogs,
288-
WorkspaceID: workspace.Workspace.ID,
288+
WorkspaceID: workspace.ID,
289289
AgentID: &workspaceAgent.ID,
290290
})
291291
}
@@ -416,10 +416,12 @@ func (api *API) workspaceAgentLogs(rw http.ResponseWriter, r *http.Request) {
416416
notifyCh <- struct{}{}
417417

418418
// Subscribe to workspace to detect new builds.
419-
closeSubscribeWorkspace, err := api.Pubsub.Subscribe(wspubsub.WorkspaceEventChannel(workspace.OwnerID),
419+
closeSubscribeWorkspace, err := api.Pubsub.SubscribeWithErr(wspubsub.WorkspaceEventChannel(workspace.OwnerID),
420420
wspubsub.HandleWorkspaceEvent(
421-
logger,
422-
func(_ context.Context, e wspubsub.WorkspaceEvent) {
421+
func(_ context.Context, e wspubsub.WorkspaceEvent, err error) {
422+
if err != nil {
423+
return
424+
}
423425
if e.Kind == wspubsub.WorkspaceEventKindStateChange && e.WorkspaceID == workspace.ID {
424426
select {
425427
case workspaceNotifyCh <- struct{}{}:

coderd/workspaces.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1676,10 +1676,12 @@ func (api *API) watchWorkspace(rw http.ResponseWriter, r *http.Request) {
16761676
})
16771677
}
16781678

1679-
cancelWorkspaceSubscribe, err := api.Pubsub.Subscribe(wspubsub.WorkspaceEventChannel(workspace.OwnerID),
1679+
cancelWorkspaceSubscribe, err := api.Pubsub.SubscribeWithErr(wspubsub.WorkspaceEventChannel(workspace.OwnerID),
16801680
wspubsub.HandleWorkspaceEvent(
1681-
api.Logger,
1682-
func(ctx context.Context, payload wspubsub.WorkspaceEvent) {
1681+
func(ctx context.Context, payload wspubsub.WorkspaceEvent, err error) {
1682+
if err != nil {
1683+
return
1684+
}
16831685
if payload.WorkspaceID != workspace.ID {
16841686
return
16851687
}

coderd/wspubsub/wspubsub.go

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@ import (
55
"encoding/json"
66
"fmt"
77

8-
"cdr.dev/slog"
9-
108
"github.com/google/uuid"
119
"golang.org/x/xerrors"
1210
)
@@ -17,18 +15,22 @@ func WorkspaceEventChannel(ownerID uuid.UUID) string {
1715
return fmt.Sprintf("workspace_owner:%s", ownerID)
1816
}
1917

20-
func HandleWorkspaceEvent(logger slog.Logger, cb func(ctx context.Context, payload WorkspaceEvent)) func(ctx context.Context, message []byte) {
21-
return func(ctx context.Context, message []byte) {
18+
func HandleWorkspaceEvent(cb func(ctx context.Context, payload WorkspaceEvent, err error)) func(ctx context.Context, message []byte, err error) {
19+
return func(ctx context.Context, message []byte, err error) {
20+
if err != nil {
21+
cb(ctx, WorkspaceEvent{}, xerrors.Errorf("workspace event pubsub: %w", err))
22+
return
23+
}
2224
var payload WorkspaceEvent
2325
if err := json.Unmarshal(message, &payload); err != nil {
24-
logger.Warn(ctx, "failed to unmarshal workspace event", slog.Error(err))
26+
cb(ctx, WorkspaceEvent{}, xerrors.Errorf("unmarshal workspace event"))
2527
return
2628
}
2729
if err := payload.Validate(); err != nil {
28-
logger.Warn(ctx, "invalid workspace event", slog.Error(err))
30+
cb(ctx, payload, xerrors.Errorf("validate workspace event"))
2931
return
3032
}
31-
cb(ctx, payload)
33+
cb(ctx, payload, err)
3234
}
3335
}
3436

0 commit comments

Comments
 (0)