@@ -3,7 +3,9 @@ package coderd
3
3
import (
4
4
"archive/tar"
5
5
"bytes"
6
+ "context"
6
7
"database/sql"
8
+ "encoding/json"
7
9
"errors"
8
10
"fmt"
9
11
"net/http"
@@ -34,6 +36,14 @@ type ProjectHistory struct {
34
36
StorageMethod database.ProjectStorageMethod `json:"storage_method"`
35
37
}
36
38
39
+ type ProjectHistoryLog struct {
40
+ ID uuid.UUID
41
+ CreatedAt time.Time `json:"created_at"`
42
+ Source database.LogSource `json:"log_source"`
43
+ Level database.LogLevel `json:"log_level"`
44
+ Output string `json:"output"`
45
+ }
46
+
37
47
// CreateProjectRequest enables callers to create a new Project.
38
48
type CreateProjectRequest struct {
39
49
Name string `json:"name" validate:"username,required"`
@@ -48,6 +58,7 @@ type CreateProjectVersionRequest struct {
48
58
49
59
type projects struct {
50
60
Database database.Store
61
+ Pubsub database.Pubsub
51
62
}
52
63
53
64
// Lists all projects the authenticated user has access to.
@@ -222,6 +233,115 @@ func (p *projects) createProjectHistory(rw http.ResponseWriter, r *http.Request)
222
233
render .JSON (rw , r , convertProjectHistory (history ))
223
234
}
224
235
236
+ func (p * projects ) projectHistoryLogs (rw http.ResponseWriter , r * http.Request ) {
237
+ projectHistory := httpmw .ProjectHistoryParam (r )
238
+ follow := r .URL .Query ().Has ("follow" )
239
+
240
+ if ! follow {
241
+ // If we're not attempting to follow logs,
242
+ // we can exit immediately!
243
+ logs , err := p .Database .GetProjectHistoryLogsByIDBefore (r .Context (), database.GetProjectHistoryLogsByIDBeforeParams {
244
+ ProjectHistoryID : projectHistory .ID ,
245
+ CreatedAt : time .Now (),
246
+ })
247
+ if errors .Is (err , sql .ErrNoRows ) {
248
+ err = nil
249
+ }
250
+ if err != nil {
251
+ httpapi .Write (rw , http .StatusInternalServerError , httpapi.Response {
252
+ Message : fmt .Sprintf ("get project history logs: %s" , err ),
253
+ })
254
+ return
255
+ }
256
+ render .Status (r , http .StatusOK )
257
+ render .JSON (rw , r , logs )
258
+ return
259
+ }
260
+
261
+ // We only want to fetch messages before subscribe, so that
262
+ // there aren't any duplicates.
263
+ timeBeforeSubscribe := database .Now ()
264
+ // Start subscribing immediately, otherwise we could miss messages
265
+ // that occur during the database read.
266
+ newLogNotify := make (chan ProjectHistoryLog , 128 )
267
+ cancelNewLogNotify , err := p .Pubsub .Subscribe (projectHistoryLogsChannel (projectHistory .ID ), func (ctx context.Context , message []byte ) {
268
+ var logs []database.ProjectHistoryLog
269
+ err := json .Unmarshal (message , & logs )
270
+ if err != nil {
271
+ httpapi .Write (rw , http .StatusInternalServerError , httpapi.Response {
272
+ Message : fmt .Sprintf ("parse logs from publish: %s" , err ),
273
+ })
274
+ return
275
+ }
276
+ for _ , log := range logs {
277
+ // If many logs are sent during our database query, this channel
278
+ // could overflow. The Go scheduler would decide the order to send
279
+ // logs in at that point, which is an unfortunate (but not fatal)
280
+ // flaw of this approach.
281
+ //
282
+ // This is an extremely unlikely outcome given reasonable database
283
+ // query times.
284
+ newLogNotify <- convertProjectHistoryLog (log )
285
+ }
286
+ })
287
+ if err != nil {
288
+ httpapi .Write (rw , http .StatusInternalServerError , httpapi.Response {
289
+ Message : fmt .Sprintf ("listen for new logs: %s" , err ),
290
+ })
291
+ return
292
+ }
293
+ defer cancelNewLogNotify ()
294
+
295
+ // In-between here logs could be missed!
296
+ projectHistoryLogs , err := p .Database .GetProjectHistoryLogsByIDBefore (r .Context (), database.GetProjectHistoryLogsByIDBeforeParams {
297
+ ProjectHistoryID : projectHistory .ID ,
298
+ CreatedAt : timeBeforeSubscribe ,
299
+ })
300
+ if errors .Is (err , sql .ErrNoRows ) {
301
+ err = nil
302
+ }
303
+ if err != nil {
304
+ httpapi .Write (rw , http .StatusInternalServerError , httpapi.Response {
305
+ Message : fmt .Sprintf ("get project history logs: %s" , err ),
306
+ })
307
+ return
308
+ }
309
+
310
+ // "follow" uses the ndjson format to stream data.
311
+ // See: https://canjs.com/doc/can-ndjson-stream.html
312
+ rw .Header ().Set ("Content-Type" , "application/stream+json" )
313
+ rw .WriteHeader (http .StatusOK )
314
+ rw .(http.Flusher ).Flush ()
315
+
316
+ // The Go stdlib JSON encoder appends a newline character after message write.
317
+ encoder := json .NewEncoder (rw )
318
+ for _ , projectHistoryLog := range projectHistoryLogs {
319
+ // JSON separated by a newline
320
+ err = encoder .Encode (convertProjectHistoryLog (projectHistoryLog ))
321
+ if err != nil {
322
+ httpapi .Write (rw , http .StatusInternalServerError , httpapi.Response {
323
+ Message : fmt .Sprintf ("marshal: %s" , err ),
324
+ })
325
+ return
326
+ }
327
+ }
328
+
329
+ for {
330
+ select {
331
+ case <- r .Context ().Done ():
332
+ return
333
+ case log := <- newLogNotify :
334
+ err = encoder .Encode (log )
335
+ if err != nil {
336
+ httpapi .Write (rw , http .StatusInternalServerError , httpapi.Response {
337
+ Message : fmt .Sprintf ("marshal follow: %s" , err ),
338
+ })
339
+ return
340
+ }
341
+ }
342
+ }
343
+ }
344
+
225
345
func convertProjectHistory (history database.ProjectHistory ) ProjectHistory {
226
346
return ProjectHistory {
227
347
ID : history .ID ,
@@ -231,3 +351,17 @@ func convertProjectHistory(history database.ProjectHistory) ProjectHistory {
231
351
Name : history .Name ,
232
352
}
233
353
}
354
+
355
+ func convertProjectHistoryLog (log database.ProjectHistoryLog ) ProjectHistoryLog {
356
+ return ProjectHistoryLog {
357
+ ID : log .ID ,
358
+ CreatedAt : log .CreatedAt ,
359
+ Source : log .Source ,
360
+ Level : log .Level ,
361
+ Output : log .Output ,
362
+ }
363
+ }
364
+
365
+ func projectHistoryLogsChannel (projectHistoryID uuid.UUID ) string {
366
+ return fmt .Sprintf ("project-history-logs:%s" , projectHistoryID )
367
+ }
0 commit comments