Skip to content

Commit 869b13d

Browse files
committed
Revert to only synchronous collection; background collection is not worth the complexity
Signed-off-by: Danny Kopping <danny@coder.com>
1 parent 361538c commit 869b13d

File tree

7 files changed

+62
-135
lines changed

7 files changed

+62
-135
lines changed

cli/server.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -678,7 +678,7 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd.
678678
}()
679679

680680
options.Database = database.New(sqlDB)
681-
ps, err := pubsub.New(ctx, logger.Named("pubsub"), sqlDB, dbURL, pubsub.LatencyMeasureInterval)
681+
ps, err := pubsub.New(ctx, logger.Named("pubsub"), sqlDB, dbURL)
682682
if err != nil {
683683
return xerrors.Errorf("create pubsub: %w", err)
684684
}

coderd/database/dbtestutil/db.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ func NewDB(t testing.TB, opts ...Option) (database.Store, pubsub.Pubsub) {
127127
}
128128
db = database.New(sqlDB)
129129

130-
ps, err = pubsub.New(context.Background(), o.logger, sqlDB, connectionURL, pubsub.LatencyMeasureInterval)
130+
ps, err = pubsub.New(context.Background(), o.logger, sqlDB, connectionURL)
131131
require.NoError(t, err)
132132
t.Cleanup(func() {
133133
_ = ps.Close()

coderd/database/pubsub/latency.go

Lines changed: 7 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"bytes"
55
"context"
66
"fmt"
7-
"sync/atomic"
87
"time"
98

109
"github.com/google/uuid"
@@ -19,16 +18,6 @@ type LatencyMeasurer struct {
1918
// Create unique pubsub channel names so that multiple coderd replicas do not clash when performing latency measurements.
2019
channel uuid.UUID
2120
logger slog.Logger
22-
23-
// background measurement members
24-
collections atomic.Int64
25-
last atomic.Value
26-
asyncCancel context.CancelCauseFunc
27-
}
28-
29-
type LatencyMeasurement struct {
30-
Send, Recv time.Duration
31-
Err error
3221
}
3322

3423
// LatencyMessageLength is the length of a UUIDv4 encoded to hex.
@@ -42,7 +31,7 @@ func NewLatencyMeasurer(logger slog.Logger) *LatencyMeasurer {
4231
}
4332

4433
// Measure takes a given Pubsub implementation, publishes a message & immediately receives it, and returns the observed latency.
45-
func (lm *LatencyMeasurer) Measure(ctx context.Context, p Pubsub) LatencyMeasurement {
34+
func (lm *LatencyMeasurer) Measure(ctx context.Context, p Pubsub) (send, recv time.Duration, err error) {
4635
var (
4736
start time.Time
4837
res = make(chan time.Duration, 1)
@@ -60,77 +49,23 @@ func (lm *LatencyMeasurer) Measure(ctx context.Context, p Pubsub) LatencyMeasure
6049
res <- time.Since(start)
6150
})
6251
if err != nil {
63-
return LatencyMeasurement{Send: -1, Recv: -1, Err: xerrors.Errorf("failed to subscribe: %w", err)}
52+
return -1, -1, xerrors.Errorf("failed to subscribe: %w", err)
6453
}
6554
defer cancel()
6655

6756
start = time.Now()
6857
err = p.Publish(lm.latencyChannelName(), msg)
6958
if err != nil {
70-
return LatencyMeasurement{Send: -1, Recv: -1, Err: xerrors.Errorf("failed to publish: %w", err)}
59+
return -1, -1, xerrors.Errorf("failed to publish: %w", err)
7160
}
7261

73-
send := time.Since(start)
74-
62+
send = time.Since(start)
7563
select {
7664
case <-ctx.Done():
7765
lm.logger.Error(ctx, "context canceled before message could be received", slog.Error(ctx.Err()), slog.F("msg", msg))
78-
return LatencyMeasurement{Send: send, Recv: -1, Err: ctx.Err()}
79-
case recv := <-res:
80-
return LatencyMeasurement{Send: send, Recv: recv}
81-
}
82-
}
83-
84-
// MeasureAsync runs latency measurements asynchronously on a given interval.
85-
// This function is expected to be run in a goroutine and will exit when the context is canceled.
86-
func (lm *LatencyMeasurer) MeasureAsync(ctx context.Context, p Pubsub, interval time.Duration) {
87-
tick := time.NewTicker(interval)
88-
defer tick.Stop()
89-
90-
ctx, cancel := context.WithCancelCause(ctx)
91-
lm.asyncCancel = cancel
92-
93-
for {
94-
// run immediately on first call, then sleep a tick before each invocation
95-
if p == nil {
96-
lm.logger.Error(ctx, "given pubsub is nil")
97-
return
98-
}
99-
100-
lm.collections.Add(1)
101-
measure := lm.Measure(ctx, p)
102-
lm.last.Store(&measure)
103-
104-
select {
105-
case <-tick.C:
106-
continue
107-
108-
// bail out if signaled
109-
case <-ctx.Done():
110-
lm.logger.Debug(ctx, "async measurement canceled", slog.Error(ctx.Err()))
111-
return
112-
}
113-
}
114-
}
115-
116-
func (lm *LatencyMeasurer) LastMeasurement() *LatencyMeasurement {
117-
val := lm.last.Load()
118-
if val == nil {
119-
return nil
120-
}
121-
122-
// nolint:forcetypeassert // Unnecessary type check.
123-
return val.(*LatencyMeasurement)
124-
}
125-
126-
func (lm *LatencyMeasurer) MeasurementCount() int64 {
127-
return lm.collections.Load()
128-
}
129-
130-
// Stop stops any background measurements.
131-
func (lm *LatencyMeasurer) Stop() {
132-
if lm.asyncCancel != nil {
133-
lm.asyncCancel(xerrors.New("stopped"))
66+
return send, -1, ctx.Err()
67+
case recv = <-res:
68+
return send, recv, nil
13469
}
13570
}
13671

coderd/database/pubsub/pubsub.go

Lines changed: 12 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -210,8 +210,9 @@ type PGPubsub struct {
210210
disconnectionsTotal prometheus.Counter
211211
connected prometheus.Gauge
212212

213-
latencyMeasurer *LatencyMeasurer
214-
latencyErrCounter atomic.Int64
213+
latencyMeasurer *LatencyMeasurer
214+
latencyMeasureCounter atomic.Int64
215+
latencyErrCounter atomic.Int64
215216
}
216217

217218
// BufferSize is the maximum number of unhandled messages we will buffer
@@ -311,8 +312,6 @@ func (p *PGPubsub) Close() error {
311312
err := p.closeListener()
312313
<-p.listenDone
313314
p.logger.Debug(context.Background(), "pubsub closed")
314-
p.latencyMeasurer.Stop()
315-
p.logger.Debug(context.Background(), "background latency measurement has stopped")
316315
return err
317316
}
318317

@@ -569,32 +568,28 @@ func (p *PGPubsub) Collect(metrics chan<- prometheus.Metric) {
569568
metrics <- prometheus.MustNewConstMetric(currentEventsDesc, prometheus.GaugeValue, float64(events))
570569

571570
// additional metrics
572-
latency := p.latencyMeasurer.LastMeasurement()
573-
if latency == nil {
574-
p.logger.Debug(context.Background(), "latency measurement not completed yet")
575-
return
576-
}
571+
ctx, cancel := context.WithTimeout(context.Background(), LatencyMeasureInterval)
572+
defer cancel()
573+
send, recv, err := p.latencyMeasurer.Measure(ctx, p)
577574

578-
metrics <- prometheus.MustNewConstMetric(pubsubLatencyMeasureCountDesc, prometheus.CounterValue, float64(p.latencyMeasurer.MeasurementCount()))
579-
if latency.Err != nil {
580-
p.logger.Warn(context.Background(), "failed to measure latency", slog.Error(latency.Err))
575+
metrics <- prometheus.MustNewConstMetric(pubsubLatencyMeasureCountDesc, prometheus.CounterValue, float64(p.latencyMeasureCounter.Add(1)))
576+
if err != nil {
577+
p.logger.Warn(context.Background(), "failed to measure latency", slog.Error(err))
581578
metrics <- prometheus.MustNewConstMetric(pubsubLatencyMeasureErrDesc, prometheus.CounterValue, float64(p.latencyErrCounter.Add(1)))
582579
return
583580
}
584-
metrics <- prometheus.MustNewConstMetric(pubsubSendLatencyDesc, prometheus.GaugeValue, latency.Send.Seconds())
585-
metrics <- prometheus.MustNewConstMetric(pubsubRecvLatencyDesc, prometheus.GaugeValue, latency.Recv.Seconds())
581+
metrics <- prometheus.MustNewConstMetric(pubsubSendLatencyDesc, prometheus.GaugeValue, send.Seconds())
582+
metrics <- prometheus.MustNewConstMetric(pubsubRecvLatencyDesc, prometheus.GaugeValue, recv.Seconds())
586583
}
587584

588585
// New creates a new Pubsub implementation using a PostgreSQL connection.
589-
func New(startCtx context.Context, logger slog.Logger, database *sql.DB, connectURL string, latencyMeasureInterval time.Duration) (*PGPubsub, error) {
586+
func New(startCtx context.Context, logger slog.Logger, database *sql.DB, connectURL string) (*PGPubsub, error) {
590587
p := newWithoutListener(logger, database)
591588
if err := p.startListener(startCtx, connectURL); err != nil {
592589
return nil, err
593590
}
594591
go p.listen()
595592
logger.Info(startCtx, "pubsub has started")
596-
go p.latencyMeasurer.MeasureAsync(startCtx, p, latencyMeasureInterval)
597-
logger.Debug(startCtx, "background latency measurement has started")
598593
return p, nil
599594
}
600595

coderd/database/pubsub/pubsub_linux_test.go

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ func TestPubsub(t *testing.T) {
4545
db, err := sql.Open("postgres", connectionURL)
4646
require.NoError(t, err)
4747
defer db.Close()
48-
pubsub, err := pubsub.New(ctx, logger, db, connectionURL, pubsub.LatencyMeasureInterval)
48+
pubsub, err := pubsub.New(ctx, logger, db, connectionURL)
4949
require.NoError(t, err)
5050
defer pubsub.Close()
5151
event := "test"
@@ -74,7 +74,7 @@ func TestPubsub(t *testing.T) {
7474
db, err := sql.Open("postgres", connectionURL)
7575
require.NoError(t, err)
7676
defer db.Close()
77-
pubsub, err := pubsub.New(ctx, logger, db, connectionURL, pubsub.LatencyMeasureInterval)
77+
pubsub, err := pubsub.New(ctx, logger, db, connectionURL)
7878
require.NoError(t, err)
7979
defer pubsub.Close()
8080
cancelFunc()
@@ -90,7 +90,7 @@ func TestPubsub(t *testing.T) {
9090
db, err := sql.Open("postgres", connectionURL)
9191
require.NoError(t, err)
9292
defer db.Close()
93-
pubsub, err := pubsub.New(ctx, logger, db, connectionURL, pubsub.LatencyMeasureInterval)
93+
pubsub, err := pubsub.New(ctx, logger, db, connectionURL)
9494
require.NoError(t, err)
9595
defer pubsub.Close()
9696

@@ -127,7 +127,7 @@ func TestPubsub_ordering(t *testing.T) {
127127
db, err := sql.Open("postgres", connectionURL)
128128
require.NoError(t, err)
129129
defer db.Close()
130-
ps, err := pubsub.New(ctx, logger, db, connectionURL, pubsub.LatencyMeasureInterval)
130+
ps, err := pubsub.New(ctx, logger, db, connectionURL)
131131
require.NoError(t, err)
132132
defer ps.Close()
133133
event := "test"
@@ -176,7 +176,7 @@ func TestPubsub_Disconnect(t *testing.T) {
176176
ctx, cancelFunc := context.WithTimeout(context.Background(), testutil.WaitSuperLong)
177177
defer cancelFunc()
178178
logger := slogtest.Make(t, &slogtest.Options{IgnoreErrors: true}).Leveled(slog.LevelDebug)
179-
ps, err := pubsub.New(ctx, logger, db, connectionURL, pubsub.LatencyMeasureInterval)
179+
ps, err := pubsub.New(ctx, logger, db, connectionURL)
180180
require.NoError(t, err)
181181
defer ps.Close()
182182
event := "test"
@@ -308,7 +308,7 @@ func TestMeasureLatency(t *testing.T) {
308308
require.NoError(t, err)
309309
db, err := sql.Open("postgres", connectionURL)
310310
require.NoError(t, err)
311-
ps, err := pubsub.New(ctx, logger, db, connectionURL, pubsub.LatencyMeasureInterval)
311+
ps, err := pubsub.New(ctx, logger, db, connectionURL)
312312
require.NoError(t, err)
313313

314314
return ps, func() {
@@ -329,10 +329,10 @@ func TestMeasureLatency(t *testing.T) {
329329
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitShort)
330330
defer cancel()
331331

332-
l := pubsub.NewLatencyMeasurer(logger).Measure(ctx, ps)
333-
require.NoError(t, l.Err)
334-
require.Greater(t, l.Send.Seconds(), 0.0)
335-
require.Greater(t, l.Recv.Seconds(), 0.0)
332+
send, recv, err := pubsub.NewLatencyMeasurer(logger).Measure(ctx, ps)
333+
require.NoError(t, err)
334+
require.Greater(t, send.Seconds(), 0.0)
335+
require.Greater(t, recv.Seconds(), 0.0)
336336
})
337337

338338
t.Run("MeasureLatencyRecvTimeout", func(t *testing.T) {
@@ -345,10 +345,10 @@ func TestMeasureLatency(t *testing.T) {
345345
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(-time.Hour))
346346
defer cancel()
347347

348-
l := pubsub.NewLatencyMeasurer(logger).Measure(ctx, ps)
349-
require.ErrorContains(t, l.Err, context.DeadlineExceeded.Error())
350-
require.Greater(t, l.Send.Seconds(), 0.0)
351-
require.EqualValues(t, l.Recv, time.Duration(-1))
348+
send, recv, err := pubsub.NewLatencyMeasurer(logger).Measure(ctx, ps)
349+
require.ErrorContains(t, err, context.DeadlineExceeded.Error())
350+
require.Greater(t, send.Seconds(), 0.0)
351+
require.EqualValues(t, recv, time.Duration(-1))
352352
})
353353

354354
t.Run("MeasureLatencyNotifyRace", func(t *testing.T) {
@@ -366,10 +366,10 @@ func TestMeasureLatency(t *testing.T) {
366366
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitShort)
367367
defer cancel()
368368

369-
l := lm.Measure(ctx, racy)
370-
assert.NoError(t, l.Err)
371-
assert.Greater(t, l.Send.Seconds(), 0.0)
372-
assert.Greater(t, l.Recv.Seconds(), 0.0)
369+
send, recv, err := lm.Measure(ctx, racy)
370+
assert.NoError(t, err)
371+
assert.Greater(t, send.Seconds(), 0.0)
372+
assert.Greater(t, recv.Seconds(), 0.0)
373373

374374
logger.Sync()
375375
assert.Contains(t, buf.String(), "received unexpected message")

coderd/database/pubsub/pubsub_test.go

Lines changed: 21 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"context"
55
"database/sql"
66
"testing"
7-
"time"
87

98
"github.com/prometheus/client_golang/prometheus"
109
"github.com/stretchr/testify/assert"
@@ -33,16 +32,18 @@ func TestPGPubsub_Metrics(t *testing.T) {
3332
registry := prometheus.NewRegistry()
3433
ctx := testutil.Context(t, testutil.WaitLong)
3534

36-
latencyMeasureInterval := time.Second
37-
start := time.Now()
38-
uut, err := pubsub.New(ctx, logger, db, connectionURL, latencyMeasureInterval)
35+
uut, err := pubsub.New(ctx, logger, db, connectionURL)
3936
require.NoError(t, err)
4037
defer uut.Close()
4138

4239
err = registry.Register(uut)
4340
require.NoError(t, err)
4441

42+
// each Gather measures pubsub latency by publishing a message & subscribing to it
43+
var gatherCount float64
44+
4545
metrics, err := registry.Gather()
46+
gatherCount++
4647
require.NoError(t, err)
4748
require.True(t, testutil.PromGaugeHasValue(t, metrics, 0, "coder_pubsub_current_events"))
4849
require.True(t, testutil.PromGaugeHasValue(t, metrics, 0, "coder_pubsub_current_subscribers"))
@@ -62,24 +63,21 @@ func TestPGPubsub_Metrics(t *testing.T) {
6263
_ = testutil.RequireRecvCtx(ctx, t, messageChannel)
6364

6465
require.Eventually(t, func() bool {
66+
latencyBytes := gatherCount * pubsub.LatencyMessageLength
6567
metrics, err = registry.Gather()
68+
gatherCount++
6669
assert.NoError(t, err)
67-
68-
latencyMeasurements := (time.Since(start).Truncate(latencyMeasureInterval).Seconds() / latencyMeasureInterval.Seconds()) + 1
69-
t.Log(latencyMeasurements)
70-
latencyMsgSize := pubsub.LatencyMessageLength * latencyMeasurements
71-
7270
return testutil.PromGaugeHasValue(t, metrics, 1, "coder_pubsub_current_events") &&
7371
testutil.PromGaugeHasValue(t, metrics, 1, "coder_pubsub_current_subscribers") &&
7472
testutil.PromGaugeHasValue(t, metrics, 1, "coder_pubsub_connected") &&
75-
testutil.PromCounterHasValue(t, metrics, 1+latencyMeasurements, "coder_pubsub_publishes_total", "true") &&
76-
testutil.PromCounterHasValue(t, metrics, 1+latencyMeasurements, "coder_pubsub_subscribes_total", "true") &&
77-
testutil.PromCounterHasValue(t, metrics, 1+latencyMeasurements, "coder_pubsub_messages_total", "normal") &&
78-
testutil.PromCounterHasValue(t, metrics, float64(len(data))+latencyMsgSize, "coder_pubsub_received_bytes_total") &&
79-
testutil.PromCounterHasValue(t, metrics, float64(len(data))+latencyMsgSize, "coder_pubsub_published_bytes_total") &&
73+
testutil.PromCounterHasValue(t, metrics, gatherCount, "coder_pubsub_publishes_total", "true") &&
74+
testutil.PromCounterHasValue(t, metrics, gatherCount, "coder_pubsub_subscribes_total", "true") &&
75+
testutil.PromCounterHasValue(t, metrics, gatherCount, "coder_pubsub_messages_total", "normal") &&
76+
testutil.PromCounterHasValue(t, metrics, float64(len(data))+latencyBytes, "coder_pubsub_received_bytes_total") &&
77+
testutil.PromCounterHasValue(t, metrics, float64(len(data))+latencyBytes, "coder_pubsub_published_bytes_total") &&
8078
testutil.PromGaugeAssertion(t, metrics, func(in float64) bool { return in > 0 }, "coder_pubsub_send_latency_seconds") &&
8179
testutil.PromGaugeAssertion(t, metrics, func(in float64) bool { return in > 0 }, "coder_pubsub_receive_latency_seconds") &&
82-
testutil.PromCounterHasValue(t, metrics, latencyMeasurements, "coder_pubsub_latency_measures_total") &&
80+
testutil.PromCounterHasValue(t, metrics, gatherCount, "coder_pubsub_latency_measures_total") &&
8381
!testutil.PromCounterGathered(t, metrics, "coder_pubsub_latency_measure_errs_total")
8482
}, testutil.WaitShort, testutil.IntervalFast)
8583

@@ -102,25 +100,22 @@ func TestPGPubsub_Metrics(t *testing.T) {
102100
_ = testutil.RequireRecvCtx(ctx, t, messageChannel)
103101

104102
require.Eventually(t, func() bool {
103+
latencyBytes := gatherCount * pubsub.LatencyMessageLength
105104
metrics, err = registry.Gather()
105+
gatherCount++
106106
assert.NoError(t, err)
107-
108-
latencyMeasurements := (time.Since(start).Truncate(latencyMeasureInterval).Seconds() / latencyMeasureInterval.Seconds()) + 1
109-
latencyMsgSize := pubsub.LatencyMessageLength * latencyMeasurements
110-
t.Log(latencyMeasurements)
111-
112107
return testutil.PromGaugeHasValue(t, metrics, 1, "coder_pubsub_current_events") &&
113108
testutil.PromGaugeHasValue(t, metrics, 2, "coder_pubsub_current_subscribers") &&
114109
testutil.PromGaugeHasValue(t, metrics, 1, "coder_pubsub_connected") &&
115-
testutil.PromCounterHasValue(t, metrics, 2+latencyMeasurements, "coder_pubsub_publishes_total", "true") &&
116-
testutil.PromCounterHasValue(t, metrics, 2+latencyMeasurements, "coder_pubsub_subscribes_total", "true") &&
117-
testutil.PromCounterHasValue(t, metrics, 1+latencyMeasurements, "coder_pubsub_messages_total", "normal") &&
110+
testutil.PromCounterHasValue(t, metrics, 1+gatherCount, "coder_pubsub_publishes_total", "true") &&
111+
testutil.PromCounterHasValue(t, metrics, 1+gatherCount, "coder_pubsub_subscribes_total", "true") &&
112+
testutil.PromCounterHasValue(t, metrics, gatherCount, "coder_pubsub_messages_total", "normal") &&
118113
testutil.PromCounterHasValue(t, metrics, 1, "coder_pubsub_messages_total", "colossal") &&
119-
testutil.PromCounterHasValue(t, metrics, float64(colossalSize+len(data))+latencyMsgSize, "coder_pubsub_received_bytes_total") &&
120-
testutil.PromCounterHasValue(t, metrics, float64(colossalSize+len(data))+latencyMsgSize, "coder_pubsub_published_bytes_total") &&
114+
testutil.PromCounterHasValue(t, metrics, float64(colossalSize+len(data))+latencyBytes, "coder_pubsub_received_bytes_total") &&
115+
testutil.PromCounterHasValue(t, metrics, float64(colossalSize+len(data))+latencyBytes, "coder_pubsub_published_bytes_total") &&
121116
testutil.PromGaugeAssertion(t, metrics, func(in float64) bool { return in > 0 }, "coder_pubsub_send_latency_seconds") &&
122117
testutil.PromGaugeAssertion(t, metrics, func(in float64) bool { return in > 0 }, "coder_pubsub_receive_latency_seconds") &&
123-
testutil.PromCounterHasValue(t, metrics, latencyMeasurements, "coder_pubsub_latency_measures_total") &&
118+
testutil.PromCounterHasValue(t, metrics, gatherCount, "coder_pubsub_latency_measures_total") &&
124119
!testutil.PromCounterGathered(t, metrics, "coder_pubsub_latency_measure_errs_total")
125120
}, testutil.WaitShort, testutil.IntervalFast)
126121
}

0 commit comments

Comments
 (0)