Skip to content

Commit bf89a2d

Browse files
committed
Stop async measurements on pubsub close
Refactor async measurement for immediate exit upon signal Signed-off-by: Danny Kopping <danny@coder.com>
1 parent 722a233 commit bf89a2d

File tree

2 files changed

+36
-11
lines changed

2 files changed

+36
-11
lines changed

coderd/database/pubsub/latency.go

Lines changed: 34 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,11 @@ type LatencyMeasurer struct {
2020
channel uuid.UUID
2121
logger slog.Logger
2222

23+
// background measurement members
2324
collections atomic.Int64
2425
last atomic.Value
26+
asyncTick *time.Ticker
27+
stop chan struct{}
2528
}
2629

2730
type LatencyMeasurement struct {
@@ -36,6 +39,7 @@ func NewLatencyMeasurer(logger slog.Logger) *LatencyMeasurer {
3639
return &LatencyMeasurer{
3740
channel: uuid.New(),
3841
logger: logger,
42+
stop: make(chan struct{}, 1),
3943
}
4044
}
4145

@@ -47,6 +51,7 @@ func (lm *LatencyMeasurer) Measure(ctx context.Context, p Pubsub) LatencyMeasure
4751
)
4852

4953
msg := []byte(uuid.New().String())
54+
lm.logger.Debug(ctx, "performing measurement", slog.F("msg", msg))
5055

5156
cancel, err := p.Subscribe(lm.latencyChannelName(), func(ctx context.Context, in []byte) {
5257
if !bytes.Equal(in, msg) {
@@ -81,23 +86,32 @@ func (lm *LatencyMeasurer) Measure(ctx context.Context, p Pubsub) LatencyMeasure
8186
// MeasureAsync runs latency measurements asynchronously on a given interval.
8287
// This function is expected to be run in a goroutine and will exit when the context is canceled.
8388
func (lm *LatencyMeasurer) MeasureAsync(ctx context.Context, p Pubsub, interval time.Duration) {
84-
tick := time.NewTicker(interval)
85-
defer tick.Stop()
86-
87-
for ; true; <-tick.C { // tick immediately
88-
select {
89-
case <-ctx.Done():
89+
lm.asyncTick = time.NewTicker(interval)
90+
defer lm.asyncTick.Stop()
91+
92+
loop:
93+
for {
94+
// run immediately on first call, then sleep a tick before each invocation
95+
if p == nil {
96+
lm.logger.Error(ctx, "given pubsub is nil")
9097
return
91-
default:
92-
if p == nil {
93-
lm.logger.Error(ctx, "given pubsub is nil")
94-
return
95-
}
9698
}
9799

98100
lm.collections.Add(1)
99101
measure := lm.Measure(ctx, p)
100102
lm.last.Store(&measure)
103+
104+
select {
105+
case <-lm.asyncTick.C:
106+
continue
107+
108+
// bail out if signaled
109+
case <-lm.stop:
110+
break loop
111+
case <-ctx.Done():
112+
lm.logger.Debug(ctx, "async measurement context canceled", slog.Error(ctx.Err()))
113+
break loop
114+
}
101115
}
102116
}
103117

@@ -115,6 +129,15 @@ func (lm *LatencyMeasurer) MeasurementCount() int64 {
115129
return lm.collections.Load()
116130
}
117131

132+
// Stop stops any background measurements.
133+
func (lm *LatencyMeasurer) Stop() {
134+
if lm.asyncTick == nil {
135+
return
136+
}
137+
lm.asyncTick.Stop()
138+
lm.stop <- struct{}{}
139+
}
140+
118141
func (lm *LatencyMeasurer) latencyChannelName() string {
119142
return fmt.Sprintf("latency-measure:%s", lm.channel)
120143
}

coderd/database/pubsub/pubsub.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -311,6 +311,8 @@ func (p *PGPubsub) Close() error {
311311
err := p.closeListener()
312312
<-p.listenDone
313313
p.logger.Debug(context.Background(), "pubsub closed")
314+
p.latencyMeasurer.Stop()
315+
p.logger.Debug(context.Background(), "background latency measurement has stopped")
314316
return err
315317
}
316318

0 commit comments

Comments
 (0)