@@ -13,6 +13,7 @@ import (
13
13
"time"
14
14
15
15
"github.com/google/uuid"
16
+ "go.uber.org/atomic"
16
17
"nhooyr.io/websocket"
17
18
18
19
"cdr.dev/slog"
@@ -374,26 +375,42 @@ func (api *API) followLogs(actor rbac.Subject, jobID uuid.UUID) (<-chan *databas
374
375
logger := api .Logger .With (slog .F ("job_id" , jobID ))
375
376
376
377
var (
377
- closed = make (chan struct {})
378
- bufferedLogs = make (chan * database.ProvisionerJobLog , 128 )
379
- logMut = & sync.Mutex {}
378
+ closeMutex sync.Mutex
379
+ bufferedLogs = make (chan * database.ProvisionerJobLog , 128 )
380
+ endOfLogs atomic.Bool
381
+ lastSentLogID atomic.Int64
380
382
)
383
+
384
+ sendLog := func (log * database.ProvisionerJobLog ) {
385
+ closeMutex .Lock ()
386
+ defer closeMutex .Unlock ()
387
+ select {
388
+ case bufferedLogs <- log :
389
+ logger .Debug (context .Background (), "subscribe buffered log" , slog .F ("stage" , log .Stage ))
390
+ lastSentLogID .Store (log .ID )
391
+ default :
392
+ // If this overflows users could miss logs streaming. This can happen
393
+ // we get a lot of logs and consumer isn't keeping up. We don't want to block the pubsub,
394
+ // so just drop them.
395
+ logger .Warn (context .Background (), "provisioner job log overflowing channel" )
396
+ }
397
+ }
398
+
381
399
closeSubscribe , err := api .Pubsub .Subscribe (
382
400
provisionerJobLogsChannel (jobID ),
383
401
func (ctx context.Context , message []byte ) {
384
- select {
385
- case <- closed :
402
+ if endOfLogs . Load () {
403
+ // Logs have already ended!
386
404
return
387
- default :
388
405
}
389
-
390
406
jlMsg := provisionerJobLogsMessage {}
391
407
err := json .Unmarshal (message , & jlMsg )
392
408
if err != nil {
393
409
logger .Warn (ctx , "invalid provisioner job log on channel" , slog .Error (err ))
394
410
return
395
411
}
396
412
413
+ // CreatedAfter is sent when logs are streaming!
397
414
if jlMsg .CreatedAfter != 0 {
398
415
logs , err := api .Database .GetProvisionerLogsByIDBetween (dbauthz .As (ctx , actor ), database.GetProvisionerLogsByIDBetweenParams {
399
416
JobID : jobID ,
@@ -403,54 +420,50 @@ func (api *API) followLogs(actor rbac.Subject, jobID uuid.UUID) (<-chan *databas
403
420
logger .Warn (ctx , "get provisioner logs" , slog .Error (err ))
404
421
return
405
422
}
406
-
407
423
for _ , log := range logs {
408
- // Sadly we have to use a mutex here because events may be
409
- // handled out of order due to golang goroutine scheduling
410
- // semantics (even though Postgres guarantees ordering of
411
- // notifications).
412
- logMut .Lock ()
413
- select {
414
- case <- closed :
415
- logMut .Unlock ()
424
+ if endOfLogs .Load () {
425
+ // An end of logs message came in while we were fetching
426
+ // logs or processing them!
416
427
return
417
- default :
418
428
}
419
429
log := log
420
- select {
421
- case bufferedLogs <- & log :
422
- logger .Debug (ctx , "subscribe buffered log" , slog .F ("stage" , log .Stage ))
423
- default :
424
- // If this overflows users could miss logs streaming. This can happen
425
- // we get a lot of logs and consumer isn't keeping up. We don't want to block the pubsub,
426
- // so just drop them.
427
- logger .Warn (ctx , "provisioner job log overflowing channel" )
428
- }
429
- logMut .Unlock ()
430
+ sendLog (& log )
430
431
}
431
432
}
432
433
434
+ // EndOfLogs is sent when logs are done streaming.
435
+ // We don't want to end the stream until we've sent all the logs,
436
+ // so we fetch logs after the last ID we've seen and send them!
433
437
if jlMsg .EndOfLogs {
434
- // This mutex is to guard double-closes.
435
- logMut .Lock ()
436
- select {
437
- case <- closed :
438
- logMut .Unlock ()
438
+ endOfLogs .Store (true )
439
+ logs , err := api .Database .GetProvisionerLogsByIDBetween (dbauthz .As (ctx , actor ), database.GetProvisionerLogsByIDBetweenParams {
440
+ JobID : jobID ,
441
+ CreatedAfter : lastSentLogID .Load (),
442
+ })
443
+ if err != nil {
444
+ logger .Warn (ctx , "get provisioner logs" , slog .Error (err ))
439
445
return
440
- default :
446
+ }
447
+ for _ , log := range logs {
448
+ log := log
449
+ sendLog (& log )
441
450
}
442
451
logger .Debug (ctx , "got End of Logs" )
452
+ closeMutex .Lock ()
443
453
bufferedLogs <- nil
444
- logMut .Unlock ()
454
+ closeMutex .Unlock ()
445
455
}
456
+
457
+ lastSentLogID .Store (jlMsg .CreatedAfter )
446
458
},
447
459
)
448
460
if err != nil {
449
461
return nil , nil , err
450
462
}
451
463
return bufferedLogs , func () {
464
+ closeMutex .Lock ()
465
+ defer closeMutex .Unlock ()
452
466
closeSubscribe ()
453
- close (closed )
454
467
close (bufferedLogs )
455
468
}, nil
456
469
}
0 commit comments