Skip to content

Commit f2cce85

Browse files
committed
feat: Make workspace watching realtime instead of polling
This was leading to performance issues on the frontend, where the page should only be rendered if changes occur. While this could be changed on the frontend, it was always the intention to make this socket ~realtime anyways.
1 parent 5be6c70 commit f2cce85

File tree

12 files changed

+240
-76
lines changed

12 files changed

+240
-76
lines changed

coderd/activitybump.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ func activityBumpWorkspace(log slog.Logger, db database.Store, workspace databas
5454

5555
newDeadline := database.Now().Add(bumpAmount)
5656

57-
if err := s.UpdateWorkspaceBuildByID(ctx, database.UpdateWorkspaceBuildByIDParams{
57+
if _, err := s.UpdateWorkspaceBuildByID(ctx, database.UpdateWorkspaceBuildByIDParams{
5858
ID: build.ID,
5959
UpdatedAt: database.Now(),
6060
ProvisionerState: build.ProvisionerState,

coderd/database/databasefake/databasefake.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2823,7 +2823,7 @@ func (q *fakeQuerier) UpdateWorkspaceLastUsedAt(_ context.Context, arg database.
28232823
return sql.ErrNoRows
28242824
}
28252825

2826-
func (q *fakeQuerier) UpdateWorkspaceBuildByID(_ context.Context, arg database.UpdateWorkspaceBuildByIDParams) error {
2826+
func (q *fakeQuerier) UpdateWorkspaceBuildByID(_ context.Context, arg database.UpdateWorkspaceBuildByIDParams) (database.WorkspaceBuild, error) {
28272827
q.mutex.Lock()
28282828
defer q.mutex.Unlock()
28292829

@@ -2835,9 +2835,9 @@ func (q *fakeQuerier) UpdateWorkspaceBuildByID(_ context.Context, arg database.U
28352835
workspaceBuild.ProvisionerState = arg.ProvisionerState
28362836
workspaceBuild.Deadline = arg.Deadline
28372837
q.workspaceBuilds[index] = workspaceBuild
2838-
return nil
2838+
return workspaceBuild, nil
28392839
}
2840-
return sql.ErrNoRows
2840+
return database.WorkspaceBuild{}, sql.ErrNoRows
28412841
}
28422842

28432843
func (q *fakeQuerier) UpdateWorkspaceDeletedByID(_ context.Context, arg database.UpdateWorkspaceDeletedByIDParams) error {

coderd/database/querier.go

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coderd/database/queries.sql.go

Lines changed: 20 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coderd/database/queries/workspacebuilds.sql

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -124,12 +124,12 @@ INSERT INTO
124124
VALUES
125125
($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) RETURNING *;
126126

127-
-- name: UpdateWorkspaceBuildByID :exec
127+
-- name: UpdateWorkspaceBuildByID :one
128128
UPDATE
129129
workspace_builds
130130
SET
131131
updated_at = $2,
132132
provisioner_state = $3,
133133
deadline = $4
134134
WHERE
135-
id = $1;
135+
id = $1 RETURNING *;

coderd/httpapi/httpapi.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -228,14 +228,20 @@ func ServerSentEventSender(rw http.ResponseWriter, r *http.Request) (sendEvent f
228228
buf := &bytes.Buffer{}
229229
enc := json.NewEncoder(buf)
230230

231-
_, err := buf.WriteString(fmt.Sprintf("event: %s\ndata: ", sse.Type))
231+
_, err := buf.WriteString(fmt.Sprintf("event: %s\n", sse.Type))
232232
if err != nil {
233233
return err
234234
}
235235

236-
err = enc.Encode(sse.Data)
237-
if err != nil {
238-
return err
236+
if sse.Data != nil {
237+
_, err = buf.WriteString("data: ")
238+
if err != nil {
239+
return err
240+
}
241+
err = enc.Encode(sse.Data)
242+
if err != nil {
243+
return err
244+
}
239245
}
240246

241247
err = buf.WriteByte('\n')

coderd/provisionerdaemons.go

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,10 @@ func (server *provisionerdServer) AcquireJob(ctx context.Context, _ *proto.Empty
223223
if err != nil {
224224
return nil, failJob(fmt.Sprintf("get owner: %s", err))
225225
}
226+
err = server.Pubsub.Publish(watchWorkspaceChannel(workspace.ID), []byte{})
227+
if err != nil {
228+
return nil, failJob(fmt.Sprintf("publish workspace update: %s", err))
229+
}
226230

227231
// Compute parameters for the workspace to consume.
228232
parameters, err := parameter.Compute(ctx, server.Database, parameter.ComputeScope{
@@ -543,7 +547,7 @@ func (server *provisionerdServer) FailJob(ctx context.Context, failJob *proto.Fa
543547
if err != nil {
544548
return nil, xerrors.Errorf("unmarshal workspace provision input: %w", err)
545549
}
546-
err = server.Database.UpdateWorkspaceBuildByID(ctx, database.UpdateWorkspaceBuildByIDParams{
550+
build, err := server.Database.UpdateWorkspaceBuildByID(ctx, database.UpdateWorkspaceBuildByIDParams{
547551
ID: input.WorkspaceBuildID,
548552
UpdatedAt: database.Now(),
549553
ProvisionerState: jobType.WorkspaceBuild.State,
@@ -552,6 +556,10 @@ func (server *provisionerdServer) FailJob(ctx context.Context, failJob *proto.Fa
552556
if err != nil {
553557
return nil, xerrors.Errorf("update workspace build state: %w", err)
554558
}
559+
err = server.Pubsub.Publish(watchWorkspaceChannel(build.WorkspaceID), []byte{})
560+
if err != nil {
561+
return nil, xerrors.Errorf("update workspace: %w", err)
562+
}
555563
case *proto.FailedJob_TemplateImport_:
556564
}
557565

@@ -657,7 +665,7 @@ func (server *provisionerdServer) CompleteJob(ctx context.Context, completed *pr
657665
if err != nil {
658666
return xerrors.Errorf("update provisioner job: %w", err)
659667
}
660-
err = db.UpdateWorkspaceBuildByID(ctx, database.UpdateWorkspaceBuildByIDParams{
668+
_, err = db.UpdateWorkspaceBuildByID(ctx, database.UpdateWorkspaceBuildByIDParams{
661669
ID: workspaceBuild.ID,
662670
Deadline: workspaceDeadline,
663671
ProvisionerState: jobType.WorkspaceBuild.State,
@@ -692,6 +700,11 @@ func (server *provisionerdServer) CompleteJob(ctx context.Context, completed *pr
692700
if err != nil {
693701
return nil, xerrors.Errorf("complete job: %w", err)
694702
}
703+
704+
err = server.Pubsub.Publish(watchWorkspaceChannel(workspaceBuild.WorkspaceID), []byte{})
705+
if err != nil {
706+
return nil, xerrors.Errorf("update workspace: %w", err)
707+
}
695708
case *proto.CompletedJob_TemplateDryRun_:
696709
for _, resource := range jobType.TemplateDryRun.Resources {
697710
server.Logger.Info(ctx, "inserting template dry-run job resource",

coderd/workspaceagents.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -539,13 +539,17 @@ func (api *API) workspaceAgentCoordinate(rw http.ResponseWriter, r *http.Request
539539
Valid: true,
540540
}
541541
_ = updateConnectionTimes()
542+
_ = api.Pubsub.Publish(watchWorkspaceChannel(build.WorkspaceID), []byte{})
542543
}()
543544

544545
err = updateConnectionTimes()
545546
if err != nil {
546547
_ = conn.Close(websocket.StatusGoingAway, err.Error())
547548
return
548549
}
550+
if !api.publishWorkspaceUpdate(ctx, rw, build.WorkspaceID) {
551+
return
552+
}
549553

550554
// End span so we don't get long lived trace data.
551555
tracing.EndHTTPSpan(r, http.StatusOK, trace.SpanFromContext(ctx))
@@ -972,6 +976,34 @@ func (api *API) postWorkspaceAppHealth(rw http.ResponseWriter, r *http.Request)
972976
}
973977
}
974978

979+
resource, err := api.Database.GetWorkspaceResourceByID(r.Context(), workspaceAgent.ResourceID)
980+
if err != nil {
981+
httpapi.Write(r.Context(), rw, http.StatusInternalServerError, codersdk.Response{
982+
Message: "Internal error fetching workspace resource.",
983+
Detail: err.Error(),
984+
})
985+
return
986+
}
987+
job, err := api.Database.GetWorkspaceBuildByJobID(r.Context(), resource.JobID)
988+
if err != nil {
989+
httpapi.Write(r.Context(), rw, http.StatusInternalServerError, codersdk.Response{
990+
Message: "Internal error fetching workspace build.",
991+
Detail: err.Error(),
992+
})
993+
return
994+
}
995+
workspace, err := api.Database.GetWorkspaceByID(r.Context(), job.WorkspaceID)
996+
if err != nil {
997+
httpapi.Write(r.Context(), rw, http.StatusInternalServerError, codersdk.Response{
998+
Message: "Internal error fetching workspace.",
999+
Detail: err.Error(),
1000+
})
1001+
return
1002+
}
1003+
if !api.publishWorkspaceUpdate(r.Context(), rw, workspace.ID) {
1004+
return
1005+
}
1006+
9751007
httpapi.Write(r.Context(), rw, http.StatusOK, nil)
9761008
}
9771009

coderd/workspacebuilds.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -574,6 +574,10 @@ func (api *API) postWorkspaceBuilds(rw http.ResponseWriter, r *http.Request) {
574574
return
575575
}
576576

577+
if !api.publishWorkspaceUpdate(ctx, rw, workspace.ID) {
578+
return
579+
}
580+
577581
httpapi.Write(ctx, rw, http.StatusCreated, apiBuild)
578582
}
579583

@@ -632,6 +636,11 @@ func (api *API) patchCancelWorkspaceBuild(rw http.ResponseWriter, r *http.Reques
632636
})
633637
return
634638
}
639+
640+
if !api.publishWorkspaceUpdate(ctx, rw, workspace.ID) {
641+
return
642+
}
643+
635644
httpapi.Write(ctx, rw, http.StatusOK, codersdk.Response{
636645
Message: "Job has been marked as canceled...",
637646
})

coderd/workspaces.go

Lines changed: 69 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -634,6 +634,10 @@ func (api *API) patchWorkspace(rw http.ResponseWriter, r *http.Request) {
634634
return
635635
}
636636

637+
if !api.publishWorkspaceUpdate(ctx, rw, workspace.ID) {
638+
return
639+
}
640+
637641
aReq.New = newWorkspace
638642
rw.WriteHeader(http.StatusNoContent)
639643
}
@@ -839,7 +843,7 @@ func (api *API) putExtendWorkspace(rw http.ResponseWriter, r *http.Request) {
839843
return err
840844
}
841845

842-
if err := s.UpdateWorkspaceBuildByID(ctx, database.UpdateWorkspaceBuildByIDParams{
846+
if _, err := s.UpdateWorkspaceBuildByID(ctx, database.UpdateWorkspaceBuildByIDParams{
843847
ID: build.ID,
844848
UpdatedAt: build.UpdatedAt,
845849
ProvisionerState: build.ProvisionerState,
@@ -883,48 +887,60 @@ func (api *API) watchWorkspace(rw http.ResponseWriter, r *http.Request) {
883887
// Ignore all trace spans after this, they're not too useful.
884888
ctx = trace.ContextWithSpan(ctx, tracing.NoopSpan)
885889

886-
t := time.NewTicker(time.Second * 1)
887-
defer t.Stop()
890+
cancelSubscribe, err := api.Pubsub.Subscribe(watchWorkspaceChannel(workspace.ID), func(_ context.Context, _ []byte) {
891+
workspace, err := api.Database.GetWorkspaceByID(ctx, workspace.ID)
892+
if err != nil {
893+
_ = sendEvent(ctx, codersdk.ServerSentEvent{
894+
Type: codersdk.ServerSentEventTypeError,
895+
Data: codersdk.Response{
896+
Message: "Internal error fetching workspace.",
897+
Detail: err.Error(),
898+
},
899+
})
900+
return
901+
}
902+
903+
data, err := api.workspaceData(ctx, []database.Workspace{workspace})
904+
if err != nil {
905+
_ = sendEvent(ctx, codersdk.ServerSentEvent{
906+
Type: codersdk.ServerSentEventTypeError,
907+
Data: codersdk.Response{
908+
Message: "Internal error fetching workspace data.",
909+
Detail: err.Error(),
910+
},
911+
})
912+
return
913+
}
914+
915+
_ = sendEvent(ctx, codersdk.ServerSentEvent{
916+
Type: codersdk.ServerSentEventTypeData,
917+
Data: convertWorkspace(
918+
workspace,
919+
data.builds[0],
920+
data.templates[0],
921+
findUser(workspace.OwnerID, data.users),
922+
),
923+
})
924+
})
925+
if err != nil {
926+
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
927+
Message: "Internal error subscribing to workspace events.",
928+
Detail: err.Error(),
929+
})
930+
return
931+
}
932+
defer cancelSubscribe()
933+
934+
_ = sendEvent(ctx, codersdk.ServerSentEvent{
935+
Type: codersdk.ServerSentEventTypePing,
936+
})
937+
888938
for {
889939
select {
890940
case <-ctx.Done():
891941
return
892942
case <-senderClosed:
893943
return
894-
case <-t.C:
895-
workspace, err := api.Database.GetWorkspaceByID(ctx, workspace.ID)
896-
if err != nil {
897-
_ = sendEvent(ctx, codersdk.ServerSentEvent{
898-
Type: codersdk.ServerSentEventTypeError,
899-
Data: codersdk.Response{
900-
Message: "Internal error fetching workspace.",
901-
Detail: err.Error(),
902-
},
903-
})
904-
return
905-
}
906-
907-
data, err := api.workspaceData(ctx, []database.Workspace{workspace})
908-
if err != nil {
909-
_ = sendEvent(ctx, codersdk.ServerSentEvent{
910-
Type: codersdk.ServerSentEventTypeError,
911-
Data: codersdk.Response{
912-
Message: "Internal error fetching workspace data.",
913-
Detail: err.Error(),
914-
},
915-
})
916-
return
917-
}
918-
919-
_ = sendEvent(ctx, codersdk.ServerSentEvent{
920-
Type: codersdk.ServerSentEventTypeData,
921-
Data: convertWorkspace(
922-
workspace,
923-
data.builds[0],
924-
data.templates[0],
925-
findUser(workspace.OwnerID, data.users),
926-
),
927-
})
928944
}
929945
}
930946
}
@@ -1213,3 +1229,19 @@ func splitQueryParameterByDelimiter(query string, delimiter rune, maintainQuotes
12131229

12141230
return parts
12151231
}
1232+
1233+
func watchWorkspaceChannel(id uuid.UUID) string {
1234+
return fmt.Sprintf("workspace:%s", id)
1235+
}
1236+
1237+
func (api *API) publishWorkspaceUpdate(ctx context.Context, rw http.ResponseWriter, workspaceID uuid.UUID) bool {
1238+
err := api.Pubsub.Publish(watchWorkspaceChannel(workspaceID), []byte{})
1239+
if err != nil {
1240+
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
1241+
Message: "Internal error publishing workspace update.",
1242+
Detail: err.Error(),
1243+
})
1244+
return false
1245+
}
1246+
return true
1247+
}

0 commit comments

Comments
 (0)