Skip to content

Commit ff73789

Browse files
committed
Stop async measurements on pubsub close
Refactor async measurement for immediate exit upon signal Signed-off-by: Danny Kopping <danny@coder.com>
1 parent 722a233 commit ff73789

File tree

2 files changed

+34
-10
lines changed

2 files changed

+34
-10
lines changed

coderd/database/pubsub/latency.go

Lines changed: 32 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,11 @@ type LatencyMeasurer struct {
2020
channel uuid.UUID
2121
logger slog.Logger
2222

23+
// background measurement members
2324
collections atomic.Int64
2425
last atomic.Value
26+
asyncTick *time.Ticker
27+
stop chan struct{}
2528
}
2629

2730
type LatencyMeasurement struct {
@@ -36,6 +39,7 @@ func NewLatencyMeasurer(logger slog.Logger) *LatencyMeasurer {
3639
return &LatencyMeasurer{
3740
channel: uuid.New(),
3841
logger: logger,
42+
stop: make(chan struct{}, 1),
3943
}
4044
}
4145

@@ -47,6 +51,7 @@ func (lm *LatencyMeasurer) Measure(ctx context.Context, p Pubsub) LatencyMeasure
4751
)
4852

4953
msg := []byte(uuid.New().String())
54+
lm.logger.Debug(ctx, "performing measurement", slog.F("msg", msg))
5055

5156
cancel, err := p.Subscribe(lm.latencyChannelName(), func(ctx context.Context, in []byte) {
5257
if !bytes.Equal(in, msg) {
@@ -81,23 +86,31 @@ func (lm *LatencyMeasurer) Measure(ctx context.Context, p Pubsub) LatencyMeasure
8186
// MeasureAsync runs latency measurements asynchronously on a given interval.
8287
// This function is expected to be run in a goroutine and will exit when the context is canceled.
8388
func (lm *LatencyMeasurer) MeasureAsync(ctx context.Context, p Pubsub, interval time.Duration) {
84-
tick := time.NewTicker(interval)
85-
defer tick.Stop()
89+
lm.asyncTick = time.NewTicker(interval)
90+
defer lm.asyncTick.Stop()
8691

87-
for ; true; <-tick.C { // tick immediately
88-
select {
89-
case <-ctx.Done():
92+
for {
93+
// run immediately on first call, then sleep a tick before each invocation
94+
if p == nil {
95+
lm.logger.Error(ctx, "given pubsub is nil")
9096
return
91-
default:
92-
if p == nil {
93-
lm.logger.Error(ctx, "given pubsub is nil")
94-
return
95-
}
9697
}
9798

9899
lm.collections.Add(1)
99100
measure := lm.Measure(ctx, p)
100101
lm.last.Store(&measure)
102+
103+
select {
104+
case <-lm.asyncTick.C:
105+
continue
106+
107+
// bail out if signaled
108+
case <-lm.stop:
109+
return
110+
case <-ctx.Done():
111+
lm.logger.Debug(ctx, "async measurement context canceled", slog.Error(ctx.Err()))
112+
return
113+
}
101114
}
102115
}
103116

@@ -115,6 +128,15 @@ func (lm *LatencyMeasurer) MeasurementCount() int64 {
115128
return lm.collections.Load()
116129
}
117130

131+
// Stop stops any background measurements.
132+
func (lm *LatencyMeasurer) Stop() {
133+
if lm.asyncTick == nil {
134+
return
135+
}
136+
lm.asyncTick.Stop()
137+
lm.stop <- struct{}{}
138+
}
139+
118140
func (lm *LatencyMeasurer) latencyChannelName() string {
119141
return fmt.Sprintf("latency-measure:%s", lm.channel)
120142
}

coderd/database/pubsub/pubsub.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -311,6 +311,8 @@ func (p *PGPubsub) Close() error {
311311
err := p.closeListener()
312312
<-p.listenDone
313313
p.logger.Debug(context.Background(), "pubsub closed")
314+
p.latencyMeasurer.Stop()
315+
p.logger.Debug(context.Background(), "background latency measurement has stopped")
314316
return err
315317
}
316318

0 commit comments

Comments
 (0)