Skip to content

Commit 487ee95

Browse files
feat(site): use websocket connection for devcontainer updates
Instead of polling every 10 seconds, we instead use a WebSocket connection for more timely updates.
1 parent 61b6562 commit 487ee95

File tree

6 files changed

+222
-10
lines changed

6 files changed

+222
-10
lines changed

agent/agentcontainers/api.go

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,10 @@ import (
2828
"github.com/coder/coder/v2/coderd/httpapi"
2929
"github.com/coder/coder/v2/codersdk"
3030
"github.com/coder/coder/v2/codersdk/agentsdk"
31+
"github.com/coder/coder/v2/codersdk/wsjson"
3132
"github.com/coder/coder/v2/provisioner"
3233
"github.com/coder/quartz"
34+
"github.com/coder/websocket"
3335
)
3436

3537
const (
@@ -74,6 +76,7 @@ type API struct {
7476

7577
mu sync.RWMutex // Protects the following fields.
7678
initDone chan struct{} // Closed by Init.
79+
updateChans []chan struct{}
7780
closed bool
7881
containers codersdk.WorkspaceAgentListContainersResponse // Output from the last list operation.
7982
containersErr error // Error from the last list operation.
@@ -535,6 +538,7 @@ func (api *API) Routes() http.Handler {
535538
r.Use(ensureInitDoneMW)
536539

537540
r.Get("/", api.handleList)
541+
r.Get("/watch", api.watchContainers)
538542
// TODO(mafredri): Simplify this route as the previous /devcontainers
539543
// /-route was dropped. We can drop the /devcontainers prefix here too.
540544
r.Route("/devcontainers/{devcontainer}", func(r chi.Router) {
@@ -544,6 +548,60 @@ func (api *API) Routes() http.Handler {
544548
return r
545549
}
546550

551+
func (api *API) watchContainers(rw http.ResponseWriter, r *http.Request) {
552+
var (
553+
ctx = r.Context()
554+
)
555+
556+
conn, err := websocket.Accept(rw, r, nil)
557+
if err != nil {
558+
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
559+
Message: "Failed to upgrade connection to websocket.",
560+
Detail: err.Error(),
561+
})
562+
return
563+
}
564+
565+
go httpapi.Heartbeat(ctx, conn)
566+
defer conn.Close(websocket.StatusNormalClosure, "connection closed")
567+
568+
encoder := wsjson.NewEncoder[codersdk.WorkspaceAgentListContainersResponse](conn, websocket.MessageText)
569+
defer encoder.Close(websocket.StatusNormalClosure)
570+
571+
updateCh := make(chan struct{})
572+
defer close(updateCh)
573+
574+
api.mu.Lock()
575+
api.updateChans = append(api.updateChans, updateCh)
576+
api.mu.Unlock()
577+
578+
defer func() {
579+
api.mu.Lock()
580+
api.updateChans = slices.DeleteFunc(api.updateChans, func(ch chan struct{}) bool {
581+
return ch == updateCh
582+
})
583+
api.mu.Unlock()
584+
}()
585+
586+
for {
587+
select {
588+
case <-ctx.Done():
589+
return
590+
591+
case <-updateCh:
592+
ct, err := api.getContainers()
593+
if err != nil {
594+
api.logger.Error(ctx, "get containers", slog.Error(err))
595+
} else {
596+
if err := encoder.Encode(ct); err != nil {
597+
api.logger.Error(ctx, "encode container list", slog.Error(err))
598+
}
599+
}
600+
601+
}
602+
}
603+
}
604+
547605
// handleList handles the HTTP request to list containers.
548606
func (api *API) handleList(rw http.ResponseWriter, r *http.Request) {
549607
ct, err := api.getContainers()
@@ -585,6 +643,11 @@ func (api *API) updateContainers(ctx context.Context) error {
585643

586644
api.processUpdatedContainersLocked(ctx, updated)
587645

646+
// Broadcast our updates
647+
for _, ch := range api.updateChans {
648+
ch <- struct{}{}
649+
}
650+
588651
api.logger.Debug(ctx, "containers updated successfully", slog.F("container_count", len(api.containers.Containers)), slog.F("warning_count", len(api.containers.Warnings)), slog.F("devcontainer_count", len(api.knownDevcontainers)))
589652

590653
return nil

coderd/coderd.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1329,6 +1329,7 @@ func New(options *Options) *API {
13291329
r.Get("/listening-ports", api.workspaceAgentListeningPorts)
13301330
r.Get("/connection", api.workspaceAgentConnection)
13311331
r.Get("/containers", api.workspaceAgentListContainers)
1332+
r.Get("/containers/watch", api.watchWorkspaceAgentContainers)
13321333
r.Post("/containers/devcontainers/{devcontainer}/recreate", api.workspaceAgentRecreateDevcontainer)
13331334
r.Get("/coordinate", api.workspaceAgentClientCoordinate)
13341335

coderd/workspaceagents.go

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -801,6 +801,90 @@ func (api *API) workspaceAgentListeningPorts(rw http.ResponseWriter, r *http.Req
801801
httpapi.Write(ctx, rw, http.StatusOK, portsResponse)
802802
}
803803

804+
// @Summary Watch agent for container updates.
805+
func (api *API) watchWorkspaceAgentContainers(rw http.ResponseWriter, r *http.Request) {
806+
var (
807+
ctx = r.Context()
808+
workspaceAgent = httpmw.WorkspaceAgentParam(r)
809+
)
810+
811+
// If the agent is unreachable, the request will hang. Assume that if we
812+
// don't get a response after 30s that the agent is unreachable.
813+
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
814+
defer cancel()
815+
apiAgent, err := db2sdk.WorkspaceAgent(
816+
api.DERPMap(),
817+
*api.TailnetCoordinator.Load(),
818+
workspaceAgent,
819+
nil,
820+
nil,
821+
nil,
822+
api.AgentInactiveDisconnectTimeout,
823+
api.DeploymentValues.AgentFallbackTroubleshootingURL.String(),
824+
)
825+
if err != nil {
826+
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
827+
Message: "Internal error reading workspace agent.",
828+
Detail: err.Error(),
829+
})
830+
return
831+
}
832+
if apiAgent.Status != codersdk.WorkspaceAgentConnected {
833+
httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{
834+
Message: fmt.Sprintf("Agent state is %q, it must be in the %q state.", apiAgent.Status, codersdk.WorkspaceAgentConnected),
835+
})
836+
return
837+
}
838+
839+
agentConn, release, err := api.agentProvider.AgentConn(ctx, workspaceAgent.ID)
840+
if err != nil {
841+
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
842+
Message: "Internal error dialing workspace agent.",
843+
Detail: err.Error(),
844+
})
845+
return
846+
}
847+
defer release()
848+
849+
containersCh, closer, err := agentConn.WatchContainers(ctx)
850+
if err != nil {
851+
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
852+
Message: "Internal error watching agent's containers.",
853+
Detail: err.Error(),
854+
})
855+
return
856+
}
857+
defer closer.Close()
858+
859+
conn, err := websocket.Accept(rw, r, nil)
860+
if err != nil {
861+
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
862+
Message: "Failed to upgrade connection to websocket.",
863+
Detail: err.Error(),
864+
})
865+
return
866+
}
867+
868+
go httpapi.Heartbeat(ctx, conn)
869+
defer conn.Close(websocket.StatusNormalClosure, "connection closed")
870+
871+
encoder := wsjson.NewEncoder[codersdk.WorkspaceAgentListContainersResponse](conn, websocket.MessageText)
872+
defer encoder.Close(websocket.StatusNormalClosure)
873+
874+
for {
875+
select {
876+
case <-ctx.Done():
877+
return
878+
879+
case containers := <-containersCh:
880+
if err := encoder.Encode(containers); err != nil {
881+
api.Logger.Error(ctx, "encode containers", slog.Error(err))
882+
return
883+
}
884+
}
885+
}
886+
}
887+
804888
// @Summary Get running containers for workspace agent
805889
// @ID get-running-containers-for-workspace-agent
806890
// @Security CoderSessionToken

codersdk/workspacesdk/agentconn.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"strconv"
1313
"time"
1414

15+
"cdr.dev/slog"
1516
"github.com/google/uuid"
1617
"github.com/hashicorp/go-multierror"
1718
"golang.org/x/crypto/ssh"
@@ -23,7 +24,9 @@ import (
2324
"github.com/coder/coder/v2/coderd/tracing"
2425
"github.com/coder/coder/v2/codersdk"
2526
"github.com/coder/coder/v2/codersdk/healthsdk"
27+
"github.com/coder/coder/v2/codersdk/wsjson"
2628
"github.com/coder/coder/v2/tailnet"
29+
"github.com/coder/websocket"
2730
)
2831

2932
// NewAgentConn creates a new WorkspaceAgentConn. `conn` may be unique
@@ -387,6 +390,27 @@ func (c *AgentConn) ListContainers(ctx context.Context) (codersdk.WorkspaceAgent
387390
return resp, json.NewDecoder(res.Body).Decode(&resp)
388391
}
389392

393+
func (c *AgentConn) WatchContainers(ctx context.Context) (<-chan codersdk.WorkspaceAgentListContainersResponse, io.Closer, error) {
394+
ctx, span := tracing.StartSpan(ctx)
395+
defer span.End()
396+
397+
host := net.JoinHostPort(c.agentAddress().String(), strconv.Itoa(AgentHTTPAPIServerPort))
398+
url := fmt.Sprintf("http://%s%s", host, "/api/v0/containers/watch")
399+
400+
conn, res, err := websocket.Dial(ctx, url, &websocket.DialOptions{
401+
HTTPClient: c.apiClient(),
402+
})
403+
if err != nil {
404+
if res != nil {
405+
return nil, nil, codersdk.ReadBodyAsError(res)
406+
}
407+
return nil, nil, err
408+
}
409+
410+
d := wsjson.NewDecoder[codersdk.WorkspaceAgentListContainersResponse](conn, websocket.MessageText, slog.Logger{})
411+
return d.Chan(), d, nil
412+
}
413+
390414
// RecreateDevcontainer recreates a devcontainer with the given container.
391415
// This is a blocking call and will wait for the container to be recreated.
392416
func (c *AgentConn) RecreateDevcontainer(ctx context.Context, devcontainerID string) (codersdk.Response, error) {

site/src/api/api.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,17 @@ export const watchWorkspace = (
129129
});
130130
};
131131

132+
export const watchAgentContainers = (
133+
agentId: string,
134+
labels?: string[],
135+
): OneWayWebSocket<TypesGen.WorkspaceAgentDevcontainer[]> => {
136+
const params = new URLSearchParams(labels?.map((label) => ["label", label]));
137+
138+
return new OneWayWebSocket({
139+
apiRoute: `/api/v2/workspaceagents/${agentId}/containers/watch?${params.toString()}`,
140+
});
141+
};
142+
132143
type WatchInboxNotificationsParams = Readonly<{
133144
read_status?: "read" | "unread" | "all";
134145
}>;

site/src/modules/resources/AgentRow.tsx

Lines changed: 39 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,12 @@ import type { Interpolation, Theme } from "@emotion/react";
22
import Collapse from "@mui/material/Collapse";
33
import Divider from "@mui/material/Divider";
44
import Skeleton from "@mui/material/Skeleton";
5-
import { API } from "api/api";
5+
import { API, watchAgentContainers } from "api/api";
66
import type {
77
Template,
88
Workspace,
99
WorkspaceAgent,
10+
WorkspaceAgentDevcontainer,
1011
WorkspaceAgentMetadata,
1112
} from "api/typesGenerated";
1213
import { isAxiosError } from "axios";
@@ -25,7 +26,7 @@ import {
2526
useRef,
2627
useState,
2728
} from "react";
28-
import { useQuery } from "react-query";
29+
import { useQuery, useQueryClient } from "react-query";
2930
import AutoSizer from "react-virtualized-auto-sizer";
3031
import type { FixedSizeList as List, ListOnScrollProps } from "react-window";
3132
import { AgentApps, organizeAgentApps } from "./AgentApps/AgentApps";
@@ -42,6 +43,9 @@ import { AgentSSHButton } from "./SSHButton/SSHButton";
4243
import { TerminalLink } from "./TerminalLink/TerminalLink";
4344
import { VSCodeDesktopButton } from "./VSCodeDesktopButton/VSCodeDesktopButton";
4445
import { useAgentLogs } from "./useAgentLogs";
46+
import { OneWayWebSocket } from "utils/OneWayWebSocket";
47+
import { displayError } from "components/GlobalSnackbar/utils";
48+
import { useEffectEvent } from "hooks/hookPolyfills";
4549

4650
interface AgentRowProps {
4751
agent: WorkspaceAgent;
@@ -73,6 +77,7 @@ export const AgentRow: FC<AgentRowProps> = ({
7377
const showVSCode = hasVSCodeApp && !browser_only;
7478

7579
const hasStartupFeatures = Boolean(agent.logs_length);
80+
const queryClient = useQueryClient();
7681
const { proxy } = useProxy();
7782
const [showLogs, setShowLogs] = useState(
7883
["starting", "start_timeout"].includes(agent.lifecycle_state) &&
@@ -138,16 +143,40 @@ export const AgentRow: FC<AgentRowProps> = ({
138143
queryFn: () => API.getAgentContainers(agent.id),
139144
enabled: agent.status === "connected",
140145
select: (res) => res.devcontainers,
141-
// TODO: Implement a websocket connection to get updates on containers
142-
// without having to poll.
143-
refetchInterval: ({ state }) => {
144-
const { error } = state;
145-
return isAxiosError(error) && error.response?.status === 403
146-
? false
147-
: 10_000;
148-
},
149146
});
150147

148+
const updateDevcontainersCache = useEffectEvent(
149+
async (devcontainers: WorkspaceAgentDevcontainer[]) => {
150+
const queryKey = ["agents", agent.id, "containers"];
151+
152+
queryClient.setQueryData(queryKey, devcontainers);
153+
await queryClient.invalidateQueries({ queryKey });
154+
},
155+
);
156+
157+
useEffect(() => {
158+
const socket = watchAgentContainers(agent.id);
159+
160+
socket.addEventListener("message", (event) => {
161+
if (event.parseError) {
162+
displayError(
163+
"Unable to process latest data from the server. Please try refreshing the page.",
164+
);
165+
return;
166+
}
167+
168+
updateDevcontainersCache(event.parsedMessage);
169+
});
170+
171+
socket.addEventListener("error", () => {
172+
displayError(
173+
"Unable to get workspace containers. Connection has been closed.",
174+
);
175+
});
176+
177+
return () => socket.close();
178+
}, [agent.id, updateDevcontainersCache]);
179+
151180
// This is used to show the parent apps of the devcontainer.
152181
const [showParentApps, setShowParentApps] = useState(false);
153182

0 commit comments

Comments
 (0)