Skip to content

Commit a7c042f

Browse files
committed
Measure latency in the background
Signed-off-by: Danny Kopping <danny@coder.com>
1 parent 68412c8 commit a7c042f

File tree

6 files changed

+132
-64
lines changed

6 files changed

+132
-64
lines changed

cli/server.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -678,7 +678,7 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd.
678678
}()
679679

680680
options.Database = database.New(sqlDB)
681-
ps, err := pubsub.New(ctx, logger.Named("pubsub"), sqlDB, dbURL)
681+
ps, err := pubsub.New(ctx, logger.Named("pubsub"), sqlDB, dbURL, pubsub.LatencyMeasureInterval)
682682
if err != nil {
683683
return xerrors.Errorf("create pubsub: %w", err)
684684
}

coderd/database/dbtestutil/db.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ func NewDB(t testing.TB, opts ...Option) (database.Store, pubsub.Pubsub) {
127127
}
128128
db = database.New(sqlDB)
129129

130-
ps, err = pubsub.New(context.Background(), o.logger, sqlDB, connectionURL)
130+
ps, err = pubsub.New(context.Background(), o.logger, sqlDB, connectionURL, pubsub.LatencyMeasureInterval)
131131
require.NoError(t, err)
132132
t.Cleanup(func() {
133133
_ = ps.Close()

coderd/database/pubsub/latency.go

Lines changed: 55 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"bytes"
55
"context"
66
"fmt"
7+
"sync/atomic"
78
"time"
89

910
"github.com/google/uuid"
@@ -18,6 +19,14 @@ type LatencyMeasurer struct {
1819
// Create unique pubsub channel names so that multiple coderd replicas do not clash when performing latency measurements.
1920
channel uuid.UUID
2021
logger slog.Logger
22+
23+
collections atomic.Int64
24+
last atomic.Value
25+
}
26+
27+
type LatencyMeasurement struct {
28+
Send, Recv time.Duration
29+
Err error
2130
}
2231

2332
// LatencyMessageLength is the length of a UUIDv4 encoded to hex.
@@ -31,10 +40,10 @@ func NewLatencyMeasurer(logger slog.Logger) *LatencyMeasurer {
3140
}
3241

3342
// Measure takes a given Pubsub implementation, publishes a message & immediately receives it, and returns the observed latency.
34-
func (lm *LatencyMeasurer) Measure(ctx context.Context, p Pubsub) (send float64, recv float64, err error) {
43+
func (lm *LatencyMeasurer) Measure(ctx context.Context, p Pubsub) LatencyMeasurement {
3544
var (
3645
start time.Time
37-
res = make(chan float64, 1)
46+
res = make(chan time.Duration, 1)
3847
)
3948

4049
msg := []byte(uuid.New().String())
@@ -45,28 +54,65 @@ func (lm *LatencyMeasurer) Measure(ctx context.Context, p Pubsub) (send float64,
4554
return
4655
}
4756

48-
res <- time.Since(start).Seconds()
57+
res <- time.Since(start)
4958
})
5059
if err != nil {
51-
return -1, -1, xerrors.Errorf("failed to subscribe: %w", err)
60+
return LatencyMeasurement{Send: -1, Recv: -1, Err: xerrors.Errorf("failed to subscribe: %w", err)}
5261
}
5362
defer cancel()
5463

5564
start = time.Now()
5665
err = p.Publish(lm.latencyChannelName(), msg)
5766
if err != nil {
58-
return -1, -1, xerrors.Errorf("failed to publish: %w", err)
67+
return LatencyMeasurement{Send: -1, Recv: -1, Err: xerrors.Errorf("failed to publish: %w", err)}
5968
}
6069

61-
send = time.Since(start).Seconds()
70+
send := time.Since(start)
6271

6372
select {
6473
case <-ctx.Done():
6574
lm.logger.Error(ctx, "context canceled before message could be received", slog.Error(ctx.Err()), slog.F("msg", msg))
66-
return send, -1, ctx.Err()
67-
case val := <-res:
68-
return send, val, nil
75+
return LatencyMeasurement{Send: send, Recv: -1, Err: ctx.Err()}
76+
case recv := <-res:
77+
return LatencyMeasurement{Send: send, Recv: recv}
78+
}
79+
}
80+
81+
// MeasureAsync runs latency measurements asynchronously on a given interval.
82+
// This function is expected to be run in a goroutine and will exit when the context is canceled.
83+
func (lm *LatencyMeasurer) MeasureAsync(ctx context.Context, p Pubsub, interval time.Duration) {
84+
tick := time.NewTicker(interval)
85+
defer tick.Stop()
86+
87+
for ; true; <-tick.C { // tick immediately
88+
select {
89+
case <-ctx.Done():
90+
return
91+
default:
92+
if p == nil {
93+
lm.logger.Error(ctx, "given pubsub is nil")
94+
return
95+
}
96+
}
97+
98+
lm.collections.Add(1)
99+
measure := lm.Measure(ctx, p)
100+
lm.last.Store(&measure)
101+
}
102+
}
103+
104+
func (lm *LatencyMeasurer) LastMeasurement() *LatencyMeasurement {
105+
val := lm.last.Load()
106+
if val == nil {
107+
return nil
69108
}
109+
110+
// nolint:forcetypeassert // Unnecessary type check.
111+
return val.(*LatencyMeasurement)
112+
}
113+
114+
func (lm *LatencyMeasurer) MeasurementCount() int64 {
115+
return lm.collections.Load()
70116
}
71117

72118
func (lm *LatencyMeasurer) latencyChannelName() string {

coderd/database/pubsub/pubsub.go

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,11 @@ import (
99
"sync"
1010
"time"
1111

12+
"sync/atomic"
13+
1214
"github.com/google/uuid"
1315
"github.com/lib/pq"
1416
"github.com/prometheus/client_golang/prometheus"
15-
"go.uber.org/atomic"
1617
"golang.org/x/xerrors"
1718

1819
"cdr.dev/slog"
@@ -29,6 +30,9 @@ type ListenerWithErr func(ctx context.Context, message []byte, err error)
2930
// might have been dropped.
3031
var ErrDroppedMessages = xerrors.New("dropped messages")
3132

33+
// LatencyMeasureInterval defines how often to trigger a new background latency measurement.
34+
const LatencyMeasureInterval = time.Second * 10
35+
3236
// Pubsub is a generic interface for broadcasting and receiving messages.
3337
// Implementors should assume high-availability with the backing implementation.
3438
type Pubsub interface {
@@ -208,7 +212,7 @@ type PGPubsub struct {
208212
connected prometheus.Gauge
209213

210214
latencyMeasurer *LatencyMeasurer
211-
latencyErrCounter atomic.Float64
215+
latencyErrCounter atomic.Int64
212216
}
213217

214218
// BufferSize is the maximum number of unhandled messages we will buffer
@@ -494,6 +498,11 @@ var (
494498
"The time taken to receive a message from a pubsub event channel",
495499
nil, nil,
496500
)
501+
pubsubLatencyMeasureCountDesc = prometheus.NewDesc(
502+
"coder_pubsub_latency_measures_total",
503+
"The number of pubsub latency measurements",
504+
nil, nil,
505+
)
497506
pubsubLatencyMeasureErrDesc = prometheus.NewDesc(
498507
"coder_pubsub_latency_measure_errs_total",
499508
"The number of pubsub latency measurement failures",
@@ -531,6 +540,7 @@ func (p *PGPubsub) Describe(descs chan<- *prometheus.Desc) {
531540
// additional metrics
532541
descs <- pubsubSendLatencyDesc
533542
descs <- pubsubRecvLatencyDesc
543+
descs <- pubsubLatencyMeasureCountDesc
534544
descs <- pubsubLatencyMeasureErrDesc
535545
}
536546

@@ -558,27 +568,32 @@ func (p *PGPubsub) Collect(metrics chan<- prometheus.Metric) {
558568
metrics <- prometheus.MustNewConstMetric(currentEventsDesc, prometheus.GaugeValue, float64(events))
559569

560570
// additional metrics
561-
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
562-
defer cancel()
571+
latency := p.latencyMeasurer.LastMeasurement()
572+
if latency == nil {
573+
p.logger.Debug(context.Background(), "latency measurement not completed yet")
574+
return
575+
}
563576

564-
send, recv, err := p.latencyMeasurer.Measure(ctx, p)
565-
if err != nil {
566-
p.logger.Warn(context.Background(), "failed to measure latency", slog.Error(err))
567-
metrics <- prometheus.MustNewConstMetric(pubsubLatencyMeasureErrDesc, prometheus.CounterValue, p.latencyErrCounter.Add(1))
577+
metrics <- prometheus.MustNewConstMetric(pubsubLatencyMeasureCountDesc, prometheus.CounterValue, float64(p.latencyMeasurer.MeasurementCount()))
578+
if latency.Err != nil {
579+
p.logger.Warn(context.Background(), "failed to measure latency", slog.Error(latency.Err))
580+
metrics <- prometheus.MustNewConstMetric(pubsubLatencyMeasureErrDesc, prometheus.CounterValue, float64(p.latencyErrCounter.Add(1)))
568581
return
569582
}
570-
metrics <- prometheus.MustNewConstMetric(pubsubSendLatencyDesc, prometheus.GaugeValue, send)
571-
metrics <- prometheus.MustNewConstMetric(pubsubRecvLatencyDesc, prometheus.GaugeValue, recv)
583+
metrics <- prometheus.MustNewConstMetric(pubsubSendLatencyDesc, prometheus.GaugeValue, latency.Send.Seconds())
584+
metrics <- prometheus.MustNewConstMetric(pubsubRecvLatencyDesc, prometheus.GaugeValue, latency.Recv.Seconds())
572585
}
573586

574587
// New creates a new Pubsub implementation using a PostgreSQL connection.
575-
func New(startCtx context.Context, logger slog.Logger, database *sql.DB, connectURL string) (*PGPubsub, error) {
588+
func New(startCtx context.Context, logger slog.Logger, database *sql.DB, connectURL string, latencyMeasureInterval time.Duration) (*PGPubsub, error) {
576589
p := newWithoutListener(logger, database)
577590
if err := p.startListener(startCtx, connectURL); err != nil {
578591
return nil, err
579592
}
580593
go p.listen()
581594
logger.Info(startCtx, "pubsub has started")
595+
go p.latencyMeasurer.MeasureAsync(startCtx, p, latencyMeasureInterval)
596+
logger.Debug(startCtx, "background latency measurement has started")
582597
return p, nil
583598
}
584599

coderd/database/pubsub/pubsub_linux_test.go

Lines changed: 23 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ func TestPubsub(t *testing.T) {
4646
db, err := sql.Open("postgres", connectionURL)
4747
require.NoError(t, err)
4848
defer db.Close()
49-
pubsub, err := pubsub.New(ctx, logger, db, connectionURL)
49+
pubsub, err := pubsub.New(ctx, logger, db, connectionURL, pubsub.LatencyMeasureInterval)
5050
require.NoError(t, err)
5151
defer pubsub.Close()
5252
event := "test"
@@ -75,7 +75,7 @@ func TestPubsub(t *testing.T) {
7575
db, err := sql.Open("postgres", connectionURL)
7676
require.NoError(t, err)
7777
defer db.Close()
78-
pubsub, err := pubsub.New(ctx, logger, db, connectionURL)
78+
pubsub, err := pubsub.New(ctx, logger, db, connectionURL, pubsub.LatencyMeasureInterval)
7979
require.NoError(t, err)
8080
defer pubsub.Close()
8181
cancelFunc()
@@ -91,7 +91,7 @@ func TestPubsub(t *testing.T) {
9191
db, err := sql.Open("postgres", connectionURL)
9292
require.NoError(t, err)
9393
defer db.Close()
94-
pubsub, err := pubsub.New(ctx, logger, db, connectionURL)
94+
pubsub, err := pubsub.New(ctx, logger, db, connectionURL, pubsub.LatencyMeasureInterval)
9595
require.NoError(t, err)
9696
defer pubsub.Close()
9797

@@ -128,7 +128,7 @@ func TestPubsub_ordering(t *testing.T) {
128128
db, err := sql.Open("postgres", connectionURL)
129129
require.NoError(t, err)
130130
defer db.Close()
131-
ps, err := pubsub.New(ctx, logger, db, connectionURL)
131+
ps, err := pubsub.New(ctx, logger, db, connectionURL, pubsub.LatencyMeasureInterval)
132132
require.NoError(t, err)
133133
defer ps.Close()
134134
event := "test"
@@ -177,7 +177,7 @@ func TestPubsub_Disconnect(t *testing.T) {
177177
ctx, cancelFunc := context.WithTimeout(context.Background(), testutil.WaitSuperLong)
178178
defer cancelFunc()
179179
logger := slogtest.Make(t, &slogtest.Options{IgnoreErrors: true}).Leveled(slog.LevelDebug)
180-
ps, err := pubsub.New(ctx, logger, db, connectionURL)
180+
ps, err := pubsub.New(ctx, logger, db, connectionURL, pubsub.LatencyMeasureInterval)
181181
require.NoError(t, err)
182182
defer ps.Close()
183183
event := "test"
@@ -309,7 +309,7 @@ func TestMeasureLatency(t *testing.T) {
309309
require.NoError(t, err)
310310
db, err := sql.Open("postgres", connectionURL)
311311
require.NoError(t, err)
312-
ps, err := pubsub.New(ctx, logger, db, connectionURL)
312+
ps, err := pubsub.New(ctx, logger, db, connectionURL, pubsub.LatencyMeasureInterval)
313313
require.NoError(t, err)
314314

315315
return ps, func() {
@@ -330,10 +330,10 @@ func TestMeasureLatency(t *testing.T) {
330330
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitShort)
331331
defer cancel()
332332

333-
send, recv, err := pubsub.NewLatencyMeasurer(logger).Measure(ctx, ps)
334-
require.NoError(t, err)
335-
require.Greater(t, send, 0.0)
336-
require.Greater(t, recv, 0.0)
333+
l := pubsub.NewLatencyMeasurer(logger).Measure(ctx, ps)
334+
require.NoError(t, l.Err)
335+
require.Greater(t, l.Send.Seconds(), 0.0)
336+
require.Greater(t, l.Recv.Seconds(), 0.0)
337337
})
338338

339339
t.Run("MeasureLatencyRecvTimeout", func(t *testing.T) {
@@ -343,13 +343,13 @@ func TestMeasureLatency(t *testing.T) {
343343
ps, done := newPubsub()
344344
defer done()
345345

346-
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(-time.Second))
346+
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(-time.Minute))
347347
defer cancel()
348348

349-
send, recv, err := pubsub.NewLatencyMeasurer(logger).Measure(ctx, ps)
350-
require.ErrorContains(t, err, context.DeadlineExceeded.Error())
351-
require.Greater(t, send, 0.0)
352-
require.EqualValues(t, recv, -1)
349+
l := pubsub.NewLatencyMeasurer(logger).Measure(ctx, ps)
350+
require.ErrorContains(t, l.Err, context.DeadlineExceeded.Error())
351+
require.Greater(t, l.Send.Seconds(), 0.0)
352+
require.EqualValues(t, l.Recv, time.Duration(-1))
353353
})
354354

355355
t.Run("MeasureLatencyNotifyRace", func(t *testing.T) {
@@ -378,10 +378,10 @@ func TestMeasureLatency(t *testing.T) {
378378
defer wg.Done()
379379

380380
hold <- struct{}{}
381-
send, recv, err := lm.Measure(ctx, slow)
382-
assert.NoError(t, err)
383-
assert.Greater(t, send, 0.0)
384-
assert.Greater(t, recv, 0.0)
381+
l := lm.Measure(ctx, slow)
382+
assert.NoError(t, l.Err)
383+
assert.Greater(t, l.Send.Seconds(), 0.0)
384+
assert.Greater(t, l.Recv.Seconds(), 0.0)
385385

386386
// The slow subscriber will complete first and receive the fast publisher's message first.
387387
logger.Sync()
@@ -396,10 +396,10 @@ func TestMeasureLatency(t *testing.T) {
396396
// Force fast publisher to start after the slow one to avoid flakiness.
397397
<-hold
398398
time.Sleep(time.Millisecond * 50)
399-
send, recv, err := lm.Measure(ctx, fast)
400-
assert.NoError(t, err)
401-
assert.Greater(t, send, 0.0)
402-
assert.Greater(t, recv, 0.0)
399+
l := lm.Measure(ctx, fast)
400+
assert.NoError(t, l.Err)
401+
assert.Greater(t, l.Send.Seconds(), 0.0)
402+
assert.Greater(t, l.Recv.Seconds(), 0.0)
403403
}()
404404

405405
wg.Wait()

0 commit comments

Comments
 (0)