Skip to content

Commit c64dca2

Browse files
committed
fix: fetch provisioner logs after end of logs message
I think this should _actually_ fix it. See https://github.com/coder/coder/actions/runs/4358242625/jobs/7618562167 The problem was that this loop is ran async so even though messages are sent in order, we were processing too slowly and the end of logs was published first.
1 parent bb0a996 commit c64dca2

File tree

1 file changed

+48
-35
lines changed

1 file changed

+48
-35
lines changed

coderd/provisionerjobs.go

Lines changed: 48 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"time"
1414

1515
"github.com/google/uuid"
16+
"go.uber.org/atomic"
1617
"nhooyr.io/websocket"
1718

1819
"cdr.dev/slog"
@@ -374,26 +375,42 @@ func (api *API) followLogs(actor rbac.Subject, jobID uuid.UUID) (<-chan *databas
374375
logger := api.Logger.With(slog.F("job_id", jobID))
375376

376377
var (
377-
closed = make(chan struct{})
378-
bufferedLogs = make(chan *database.ProvisionerJobLog, 128)
379-
logMut = &sync.Mutex{}
378+
closeMutex sync.Mutex
379+
bufferedLogs = make(chan *database.ProvisionerJobLog, 128)
380+
endOfLogs atomic.Bool
381+
lastSentLogID atomic.Int64
380382
)
383+
384+
sendLog := func(log *database.ProvisionerJobLog) {
385+
closeMutex.Lock()
386+
defer closeMutex.Unlock()
387+
select {
388+
case bufferedLogs <- log:
389+
logger.Debug(context.Background(), "subscribe buffered log", slog.F("stage", log.Stage))
390+
lastSentLogID.Store(log.ID)
391+
default:
392+
// If this overflows users could miss logs streaming. This can happen
393+
// we get a lot of logs and consumer isn't keeping up. We don't want to block the pubsub,
394+
// so just drop them.
395+
logger.Warn(context.Background(), "provisioner job log overflowing channel")
396+
}
397+
}
398+
381399
closeSubscribe, err := api.Pubsub.Subscribe(
382400
provisionerJobLogsChannel(jobID),
383401
func(ctx context.Context, message []byte) {
384-
select {
385-
case <-closed:
402+
if endOfLogs.Load() {
403+
// Logs have already ended!
386404
return
387-
default:
388405
}
389-
390406
jlMsg := provisionerJobLogsMessage{}
391407
err := json.Unmarshal(message, &jlMsg)
392408
if err != nil {
393409
logger.Warn(ctx, "invalid provisioner job log on channel", slog.Error(err))
394410
return
395411
}
396412

413+
// CreatedAfter is sent when logs are streaming!
397414
if jlMsg.CreatedAfter != 0 {
398415
logs, err := api.Database.GetProvisionerLogsByIDBetween(dbauthz.As(ctx, actor), database.GetProvisionerLogsByIDBetweenParams{
399416
JobID: jobID,
@@ -403,54 +420,50 @@ func (api *API) followLogs(actor rbac.Subject, jobID uuid.UUID) (<-chan *databas
403420
logger.Warn(ctx, "get provisioner logs", slog.Error(err))
404421
return
405422
}
406-
407423
for _, log := range logs {
408-
// Sadly we have to use a mutex here because events may be
409-
// handled out of order due to golang goroutine scheduling
410-
// semantics (even though Postgres guarantees ordering of
411-
// notifications).
412-
logMut.Lock()
413-
select {
414-
case <-closed:
415-
logMut.Unlock()
424+
if endOfLogs.Load() {
425+
// An end of logs message came in while we were fetching
426+
// logs or processing them!
416427
return
417-
default:
418428
}
419429
log := log
420-
select {
421-
case bufferedLogs <- &log:
422-
logger.Debug(ctx, "subscribe buffered log", slog.F("stage", log.Stage))
423-
default:
424-
// If this overflows users could miss logs streaming. This can happen
425-
// we get a lot of logs and consumer isn't keeping up. We don't want to block the pubsub,
426-
// so just drop them.
427-
logger.Warn(ctx, "provisioner job log overflowing channel")
428-
}
429-
logMut.Unlock()
430+
sendLog(&log)
430431
}
431432
}
432433

434+
// EndOfLogs is sent when logs are done streaming.
435+
// We don't want to end the stream until we've sent all the logs,
436+
// so we fetch logs after the last ID we've seen and send them!
433437
if jlMsg.EndOfLogs {
434-
// This mutex is to guard double-closes.
435-
logMut.Lock()
436-
select {
437-
case <-closed:
438-
logMut.Unlock()
438+
endOfLogs.Store(true)
439+
logs, err := api.Database.GetProvisionerLogsByIDBetween(dbauthz.As(ctx, actor), database.GetProvisionerLogsByIDBetweenParams{
440+
JobID: jobID,
441+
CreatedAfter: lastSentLogID.Load(),
442+
})
443+
if err != nil {
444+
logger.Warn(ctx, "get provisioner logs", slog.Error(err))
439445
return
440-
default:
446+
}
447+
for _, log := range logs {
448+
log := log
449+
sendLog(&log)
441450
}
442451
logger.Debug(ctx, "got End of Logs")
452+
closeMutex.Lock()
443453
bufferedLogs <- nil
444-
logMut.Unlock()
454+
closeMutex.Unlock()
445455
}
456+
457+
lastSentLogID.Store(jlMsg.CreatedAfter)
446458
},
447459
)
448460
if err != nil {
449461
return nil, nil, err
450462
}
451463
return bufferedLogs, func() {
464+
closeMutex.Lock()
465+
defer closeMutex.Unlock()
452466
closeSubscribe()
453-
close(closed)
454467
close(bufferedLogs)
455468
}, nil
456469
}

0 commit comments

Comments
 (0)