Skip to content

Commit c440af4

Browse files
committed
Log streaming
1 parent c01860d commit c440af4

File tree

12 files changed

+578
-12
lines changed

12 files changed

+578
-12
lines changed

coderd/coderd.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,21 +16,25 @@ import (
1616
type Options struct {
1717
Logger slog.Logger
1818
Database database.Store
19+
Pubsub database.Pubsub
1920
}
2021

2122
// New constructs the Coder API into an HTTP handler.
2223
func New(options *Options) http.Handler {
2324
projects := &projects{
2425
Database: options.Database,
26+
Pubsub: options.Pubsub,
2527
}
2628
provisionerd := &provisionerd{
2729
Database: options.Database,
30+
Pubsub: options.Pubsub,
2831
}
2932
users := &users{
3033
Database: options.Database,
3134
}
3235
workspaces := &workspaces{
3336
Database: options.Database,
37+
Pubsub: options.Pubsub,
3438
}
3539

3640
r := chi.NewRouter()
@@ -71,6 +75,10 @@ func New(options *Options) http.Handler {
7175
r.Route("/history", func(r chi.Router) {
7276
r.Get("/", projects.allProjectHistory)
7377
r.Post("/", projects.createProjectHistory)
78+
r.Route("/{projecthistory}", func(r chi.Router) {
79+
r.Use(httpmw.ExtractProjectHistoryParam(options.Database))
80+
r.Get("/logs", projects.projectHistoryLogs)
81+
})
7482
})
7583
r.Get("/workspaces", workspaces.allWorkspacesForProject)
7684
})
@@ -93,6 +101,10 @@ func New(options *Options) http.Handler {
93101
r.Post("/", workspaces.createWorkspaceHistory)
94102
r.Get("/", workspaces.listAllWorkspaceHistory)
95103
r.Get("/latest", workspaces.latestWorkspaceHistory)
104+
r.Route("/{workspacehistory}", func(r chi.Router) {
105+
r.Use(httpmw.ExtractWorkspaceHistoryParam(options.Database))
106+
r.Get("/logs", workspaces.workspaceHistoryLogs)
107+
})
96108
})
97109
})
98110
})

coderd/coderdtest/coderdtest.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ func (s *Server) RandomInitialUser(t *testing.T) coderd.CreateInitialUserRequest
6262
func New(t *testing.T) Server {
6363
// This can be hotswapped for a live database instance.
6464
db := databasefake.New()
65+
pubsub := database.NewPubsubInMemory()
6566
if os.Getenv("DB") != "" {
6667
connectionURL, close, err := postgres.Open()
6768
require.NoError(t, err)
@@ -74,11 +75,15 @@ func New(t *testing.T) Server {
7475
err = database.Migrate(sqlDB)
7576
require.NoError(t, err)
7677
db = database.New(sqlDB)
78+
79+
pubsub, err = database.NewPubsub(context.Background(), sqlDB, connectionURL)
80+
require.NoError(t, err)
7781
}
7882

7983
handler := coderd.New(&coderd.Options{
8084
Logger: slogtest.Make(t, nil),
8185
Database: db,
86+
Pubsub: pubsub,
8287
})
8388
srv := httptest.NewServer(handler)
8489
serverURL, err := url.Parse(srv.URL)

coderd/projects.go

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@ package coderd
33
import (
44
"archive/tar"
55
"bytes"
6+
"context"
67
"database/sql"
8+
"encoding/json"
79
"errors"
810
"fmt"
911
"net/http"
@@ -34,6 +36,14 @@ type ProjectHistory struct {
3436
StorageMethod database.ProjectStorageMethod `json:"storage_method"`
3537
}
3638

39+
type ProjectHistoryLog struct {
40+
ID uuid.UUID
41+
CreatedAt time.Time `json:"created_at"`
42+
Source database.LogSource `json:"log_source"`
43+
Level database.LogLevel `json:"log_level"`
44+
Output string `json:"output"`
45+
}
46+
3747
// CreateProjectRequest enables callers to create a new Project.
3848
type CreateProjectRequest struct {
3949
Name string `json:"name" validate:"username,required"`
@@ -48,6 +58,7 @@ type CreateProjectVersionRequest struct {
4858

4959
type projects struct {
5060
Database database.Store
61+
Pubsub database.Pubsub
5162
}
5263

5364
// Lists all projects the authenticated user has access to.
@@ -222,6 +233,115 @@ func (p *projects) createProjectHistory(rw http.ResponseWriter, r *http.Request)
222233
render.JSON(rw, r, convertProjectHistory(history))
223234
}
224235

236+
func (p *projects) projectHistoryLogs(rw http.ResponseWriter, r *http.Request) {
237+
projectHistory := httpmw.ProjectHistoryParam(r)
238+
follow := r.URL.Query().Has("follow")
239+
240+
if !follow {
241+
// If we're not attempting to follow logs,
242+
// we can exit immediately!
243+
logs, err := p.Database.GetProjectHistoryLogsByIDBefore(r.Context(), database.GetProjectHistoryLogsByIDBeforeParams{
244+
ProjectHistoryID: projectHistory.ID,
245+
CreatedAt: time.Now(),
246+
})
247+
if errors.Is(err, sql.ErrNoRows) {
248+
err = nil
249+
}
250+
if err != nil {
251+
httpapi.Write(rw, http.StatusInternalServerError, httpapi.Response{
252+
Message: fmt.Sprintf("get project history logs: %s", err),
253+
})
254+
return
255+
}
256+
render.Status(r, http.StatusOK)
257+
render.JSON(rw, r, logs)
258+
return
259+
}
260+
261+
// We only want to fetch messages before subscribe, so that
262+
// there aren't any duplicates.
263+
timeBeforeSubscribe := database.Now()
264+
// Start subscribing immediately, otherwise we could miss messages
265+
// that occur during the database read.
266+
newLogNotify := make(chan ProjectHistoryLog, 128)
267+
cancelNewLogNotify, err := p.Pubsub.Subscribe(projectHistoryLogsChannel(projectHistory.ID), func(ctx context.Context, message []byte) {
268+
var logs []database.ProjectHistoryLog
269+
err := json.Unmarshal(message, &logs)
270+
if err != nil {
271+
httpapi.Write(rw, http.StatusInternalServerError, httpapi.Response{
272+
Message: fmt.Sprintf("parse logs from publish: %s", err),
273+
})
274+
return
275+
}
276+
for _, log := range logs {
277+
// If many logs are sent during our database query, this channel
278+
// could overflow. The Go scheduler would decide the order to send
279+
// logs in at that point, which is an unfortunate (but not fatal)
280+
// flaw of this approach.
281+
//
282+
// This is an extremely unlikely outcome given reasonable database
283+
// query times.
284+
newLogNotify <- convertProjectHistoryLog(log)
285+
}
286+
})
287+
if err != nil {
288+
httpapi.Write(rw, http.StatusInternalServerError, httpapi.Response{
289+
Message: fmt.Sprintf("listen for new logs: %s", err),
290+
})
291+
return
292+
}
293+
defer cancelNewLogNotify()
294+
295+
// In-between here logs could be missed!
296+
projectHistoryLogs, err := p.Database.GetProjectHistoryLogsByIDBefore(r.Context(), database.GetProjectHistoryLogsByIDBeforeParams{
297+
ProjectHistoryID: projectHistory.ID,
298+
CreatedAt: timeBeforeSubscribe,
299+
})
300+
if errors.Is(err, sql.ErrNoRows) {
301+
err = nil
302+
}
303+
if err != nil {
304+
httpapi.Write(rw, http.StatusInternalServerError, httpapi.Response{
305+
Message: fmt.Sprintf("get project history logs: %s", err),
306+
})
307+
return
308+
}
309+
310+
// "follow" uses the ndjson format to stream data.
311+
// See: https://canjs.com/doc/can-ndjson-stream.html
312+
rw.Header().Set("Content-Type", "application/stream+json")
313+
rw.WriteHeader(http.StatusOK)
314+
rw.(http.Flusher).Flush()
315+
316+
// The Go stdlib JSON encoder appends a newline character after message write.
317+
encoder := json.NewEncoder(rw)
318+
for _, projectHistoryLog := range projectHistoryLogs {
319+
// JSON separated by a newline
320+
err = encoder.Encode(convertProjectHistoryLog(projectHistoryLog))
321+
if err != nil {
322+
httpapi.Write(rw, http.StatusInternalServerError, httpapi.Response{
323+
Message: fmt.Sprintf("marshal: %s", err),
324+
})
325+
return
326+
}
327+
}
328+
329+
for {
330+
select {
331+
case <-r.Context().Done():
332+
return
333+
case log := <-newLogNotify:
334+
err = encoder.Encode(log)
335+
if err != nil {
336+
httpapi.Write(rw, http.StatusInternalServerError, httpapi.Response{
337+
Message: fmt.Sprintf("marshal follow: %s", err),
338+
})
339+
return
340+
}
341+
}
342+
}
343+
}
344+
225345
func convertProjectHistory(history database.ProjectHistory) ProjectHistory {
226346
return ProjectHistory{
227347
ID: history.ID,
@@ -231,3 +351,17 @@ func convertProjectHistory(history database.ProjectHistory) ProjectHistory {
231351
Name: history.Name,
232352
}
233353
}
354+
355+
func convertProjectHistoryLog(log database.ProjectHistoryLog) ProjectHistoryLog {
356+
return ProjectHistoryLog{
357+
ID: log.ID,
358+
CreatedAt: log.CreatedAt,
359+
Source: log.Source,
360+
Level: log.Level,
361+
Output: log.Output,
362+
}
363+
}
364+
365+
func projectHistoryLogsChannel(projectHistoryID uuid.UUID) string {
366+
return fmt.Sprintf("project-history-logs:%s", projectHistoryID)
367+
}

0 commit comments

Comments
 (0)