Skip to content

Commit 28a96de

Browse files
committed
Refactor to avoid global state
Signed-off-by: Danny Kopping <danny@coder.com>
1 parent 34083d0 commit 28a96de

File tree

3 files changed

+33
-20
lines changed

3 files changed

+33
-20
lines changed

coderd/database/pubsub/latency.go

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,24 +9,29 @@ import (
99
"golang.org/x/xerrors"
1010
)
1111

12-
var channelID uuid.UUID
12+
// LatencyMeasurer is used to measure the send & receive latencies of the underlying Pubsub implementation. We use these
13+
// measurements to export metrics which can indicate when a Pubsub implementation's queue is overloaded and/or full.
14+
type LatencyMeasurer struct {
15+
// Create unique pubsub channel names so that multiple replicas do not clash when performing latency measurements,
16+
// and only create one UUID per Pubsub impl (and not request) to limit the number of notification channels that need
17+
// to be maintained by the Pubsub impl.
18+
channelIDs map[Pubsub]uuid.UUID
19+
}
1320

14-
// Create a new pubsub channel UUID per coderd instance so that multiple replicas do not clash when performing latency
15-
// measurements, and only create one UUID per instance (and not request) to limit the number of notification channels
16-
// that need to be maintained by the Pubsub implementation.
17-
func init() {
18-
channelID = uuid.New()
21+
func NewLatencyMeasurer() *LatencyMeasurer {
22+
return &LatencyMeasurer{
23+
channelIDs: make(map[Pubsub]uuid.UUID),
24+
}
1925
}
2026

21-
// MeasureLatency takes a given Pubsub implementation, publishes a message & immediately receives it, and returns the
22-
// observed latency.
23-
func MeasureLatency(ctx context.Context, p Pubsub) (send float64, recv float64, err error) {
27+
// Measure takes a given Pubsub implementation, publishes a message & immediately receives it, and returns the observed latency.
28+
func (lm *LatencyMeasurer) Measure(ctx context.Context, p Pubsub) (send float64, recv float64, err error) {
2429
var (
2530
start time.Time
2631
res = make(chan float64, 1)
2732
)
2833

29-
cancel, err := p.Subscribe(latencyChannelName(), func(ctx context.Context, _ []byte) {
34+
cancel, err := p.Subscribe(lm.latencyChannelName(p), func(ctx context.Context, _ []byte) {
3035
res <- time.Since(start).Seconds()
3136
})
3237
if err != nil {
@@ -35,7 +40,7 @@ func MeasureLatency(ctx context.Context, p Pubsub) (send float64, recv float64,
3540
defer cancel()
3641

3742
start = time.Now()
38-
err = p.Publish(latencyChannelName(), []byte{})
43+
err = p.Publish(lm.latencyChannelName(p), []byte{})
3944
if err != nil {
4045
return -1, -1, xerrors.Errorf("failed to publish: %w", err)
4146
}
@@ -50,6 +55,12 @@ func MeasureLatency(ctx context.Context, p Pubsub) (send float64, recv float64,
5055
}
5156
}
5257

53-
func latencyChannelName() string {
54-
return fmt.Sprintf("latency-measure:%s", channelID.String())
58+
func (lm *LatencyMeasurer) latencyChannelName(p Pubsub) string {
59+
cid, found := lm.channelIDs[p]
60+
if !found {
61+
cid = uuid.New()
62+
lm.channelIDs[p] = cid
63+
}
64+
65+
return fmt.Sprintf("latency-measure:%s", cid.String())
5566
}

coderd/database/pubsub/pubsub.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,7 @@ type PGPubsub struct {
207207
disconnectionsTotal prometheus.Counter
208208
connected prometheus.Gauge
209209

210+
latencyMeasurer *LatencyMeasurer
210211
latencyErrCounter atomic.Float64
211212
}
212213

@@ -560,7 +561,7 @@ func (p *PGPubsub) Collect(metrics chan<- prometheus.Metric) {
560561
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
561562
defer cancel()
562563

563-
send, recv, err := MeasureLatency(ctx, p)
564+
send, recv, err := p.latencyMeasurer.Measure(ctx, p)
564565
if err != nil {
565566
p.logger.Warn(context.Background(), "failed to measure latency", slog.Error(err))
566567
metrics <- prometheus.MustNewConstMetric(pubsubLatencyMeasureErrDesc, prometheus.CounterValue, p.latencyErrCounter.Add(1))
@@ -584,10 +585,11 @@ func New(startCtx context.Context, logger slog.Logger, database *sql.DB, connect
584585
// newWithoutListener creates a new PGPubsub without creating the pqListener.
585586
func newWithoutListener(logger slog.Logger, database *sql.DB) *PGPubsub {
586587
return &PGPubsub{
587-
logger: logger,
588-
listenDone: make(chan struct{}),
589-
db: database,
590-
queues: make(map[string]map[uuid.UUID]*msgQueue),
588+
logger: logger,
589+
listenDone: make(chan struct{}),
590+
db: database,
591+
queues: make(map[string]map[uuid.UUID]*msgQueue),
592+
latencyMeasurer: NewLatencyMeasurer(),
591593

592594
publishesTotal: prometheus.NewCounterVec(prometheus.CounterOpts{
593595
Namespace: "coder",

coderd/database/pubsub/pubsub_linux_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -325,7 +325,7 @@ func TestMeasureLatency(t *testing.T) {
325325
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitSuperLong)
326326
defer cancel()
327327

328-
send, recv, err := pubsub.MeasureLatency(ctx, ps)
328+
send, recv, err := pubsub.NewLatencyMeasurer().Measure(ctx, ps)
329329
require.NoError(t, err)
330330
require.Greater(t, send, 0.0)
331331
require.Greater(t, recv, 0.0)
@@ -341,7 +341,7 @@ func TestMeasureLatency(t *testing.T) {
341341
ctx, cancel := context.WithTimeout(context.Background(), time.Nanosecond)
342342
defer cancel()
343343

344-
send, recv, err := pubsub.MeasureLatency(ctx, ps)
344+
send, recv, err := pubsub.NewLatencyMeasurer().Measure(ctx, ps)
345345
require.ErrorContains(t, err, context.DeadlineExceeded.Error())
346346
require.Greater(t, send, 0.0)
347347
require.EqualValues(t, recv, -1)

0 commit comments

Comments
 (0)