Skip to content

feat: measure pubsub latencies and expose metrics #13126

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 17 commits into from
May 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 74 additions & 0 deletions coderd/database/pubsub/latency.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package pubsub

import (
"bytes"
"context"
"fmt"
"time"

"github.com/google/uuid"
"golang.org/x/xerrors"

"cdr.dev/slog"
)

// LatencyMeasurer is used to measure the send & receive latencies of the underlying Pubsub implementation. We use these
// measurements to export metrics which can indicate when a Pubsub implementation's queue is overloaded and/or full.
type LatencyMeasurer struct {
// Create unique pubsub channel names so that multiple coderd replicas do not clash when performing latency measurements.
channel uuid.UUID
logger slog.Logger
}

// LatencyMessageLength is the length of a UUIDv4 encoded to hex.
const LatencyMessageLength = 36

func NewLatencyMeasurer(logger slog.Logger) *LatencyMeasurer {
return &LatencyMeasurer{
channel: uuid.New(),
logger: logger,
}
}

// Measure takes a given Pubsub implementation, publishes a message & immediately receives it, and returns the observed latency.
func (lm *LatencyMeasurer) Measure(ctx context.Context, p Pubsub) (send, recv time.Duration, err error) {
var (
start time.Time
res = make(chan time.Duration, 1)
)

msg := []byte(uuid.New().String())
lm.logger.Debug(ctx, "performing measurement", slog.F("msg", msg))

cancel, err := p.Subscribe(lm.latencyChannelName(), func(ctx context.Context, in []byte) {
if !bytes.Equal(in, msg) {
lm.logger.Warn(ctx, "received unexpected message", slog.F("got", in), slog.F("expected", msg))
return
}

res <- time.Since(start)
})
if err != nil {
return -1, -1, xerrors.Errorf("failed to subscribe: %w", err)
}
defer cancel()

start = time.Now()
err = p.Publish(lm.latencyChannelName(), msg)
if err != nil {
return -1, -1, xerrors.Errorf("failed to publish: %w", err)
}

send = time.Since(start)
select {
case <-ctx.Done():
lm.logger.Error(ctx, "context canceled before message could be received", slog.Error(ctx.Err()), slog.F("msg", msg))
return send, -1, ctx.Err()
case recv = <-res:
return send, recv, nil
}
}

func (lm *LatencyMeasurer) latencyChannelName() string {
return fmt.Sprintf("latency-measure:%s", lm.channel)
}
61 changes: 57 additions & 4 deletions coderd/database/pubsub/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"io"
"net"
"sync"
"sync/atomic"
"time"

"github.com/google/uuid"
Expand All @@ -28,6 +29,9 @@ type ListenerWithErr func(ctx context.Context, message []byte, err error)
// might have been dropped.
var ErrDroppedMessages = xerrors.New("dropped messages")

// LatencyMeasureTimeout defines how often to trigger a new background latency measurement.
const LatencyMeasureTimeout = time.Second * 10

// Pubsub is a generic interface for broadcasting and receiving messages.
// Implementors should assume high-availability with the backing implementation.
type Pubsub interface {
Expand Down Expand Up @@ -205,6 +209,10 @@ type PGPubsub struct {
receivedBytesTotal prometheus.Counter
disconnectionsTotal prometheus.Counter
connected prometheus.Gauge

latencyMeasurer *LatencyMeasurer
latencyMeasureCounter atomic.Int64
latencyErrCounter atomic.Int64
}

// BufferSize is the maximum number of unhandled messages we will buffer
Expand Down Expand Up @@ -478,6 +486,30 @@ var (
)
)

// additional metrics collected out-of-band
var (
pubsubSendLatencyDesc = prometheus.NewDesc(
"coder_pubsub_send_latency_seconds",
"The time taken to send a message into a pubsub event channel",
nil, nil,
)
pubsubRecvLatencyDesc = prometheus.NewDesc(
"coder_pubsub_receive_latency_seconds",
"The time taken to receive a message from a pubsub event channel",
nil, nil,
)
pubsubLatencyMeasureCountDesc = prometheus.NewDesc(
"coder_pubsub_latency_measures_total",
"The number of pubsub latency measurements",
nil, nil,
)
pubsubLatencyMeasureErrDesc = prometheus.NewDesc(
"coder_pubsub_latency_measure_errs_total",
"The number of pubsub latency measurement failures",
nil, nil,
)
)

// We'll track messages as size "normal" and "colossal", where the
// latter are messages larger than 7600 bytes, or 95% of the postgres
// notify limit. If we see a lot of colossal packets that's an indication that
Expand All @@ -504,6 +536,12 @@ func (p *PGPubsub) Describe(descs chan<- *prometheus.Desc) {
// implicit metrics
descs <- currentSubscribersDesc
descs <- currentEventsDesc

// additional metrics
descs <- pubsubSendLatencyDesc
descs <- pubsubRecvLatencyDesc
descs <- pubsubLatencyMeasureCountDesc
descs <- pubsubLatencyMeasureErrDesc
}

// Collect implements, along with Describe, the prometheus.Collector interface
Expand All @@ -528,6 +566,20 @@ func (p *PGPubsub) Collect(metrics chan<- prometheus.Metric) {
p.qMu.Unlock()
metrics <- prometheus.MustNewConstMetric(currentSubscribersDesc, prometheus.GaugeValue, float64(subs))
metrics <- prometheus.MustNewConstMetric(currentEventsDesc, prometheus.GaugeValue, float64(events))

// additional metrics
ctx, cancel := context.WithTimeout(context.Background(), LatencyMeasureTimeout)
defer cancel()
send, recv, err := p.latencyMeasurer.Measure(ctx, p)

metrics <- prometheus.MustNewConstMetric(pubsubLatencyMeasureCountDesc, prometheus.CounterValue, float64(p.latencyMeasureCounter.Add(1)))
if err != nil {
p.logger.Warn(context.Background(), "failed to measure latency", slog.Error(err))
metrics <- prometheus.MustNewConstMetric(pubsubLatencyMeasureErrDesc, prometheus.CounterValue, float64(p.latencyErrCounter.Add(1)))
return
}
metrics <- prometheus.MustNewConstMetric(pubsubSendLatencyDesc, prometheus.GaugeValue, send.Seconds())
metrics <- prometheus.MustNewConstMetric(pubsubRecvLatencyDesc, prometheus.GaugeValue, recv.Seconds())
}

// New creates a new Pubsub implementation using a PostgreSQL connection.
Expand All @@ -544,10 +596,11 @@ func New(startCtx context.Context, logger slog.Logger, database *sql.DB, connect
// newWithoutListener creates a new PGPubsub without creating the pqListener.
func newWithoutListener(logger slog.Logger, database *sql.DB) *PGPubsub {
return &PGPubsub{
logger: logger,
listenDone: make(chan struct{}),
db: database,
queues: make(map[string]map[uuid.UUID]*msgQueue),
logger: logger,
listenDone: make(chan struct{}),
db: database,
queues: make(map[string]map[uuid.UUID]*msgQueue),
latencyMeasurer: NewLatencyMeasurer(logger.Named("latency-measurer")),

publishesTotal: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "coder",
Expand Down
111 changes: 111 additions & 0 deletions coderd/database/pubsub/pubsub_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package pubsub_test

import (
"bytes"
"context"
"database/sql"
"fmt"
Expand All @@ -15,6 +16,8 @@ import (
"github.com/stretchr/testify/require"
"golang.org/x/xerrors"

"cdr.dev/slog/sloggers/sloghuman"

"cdr.dev/slog"
"cdr.dev/slog/sloggers/slogtest"
"github.com/coder/coder/v2/coderd/database/dbtestutil"
Expand Down Expand Up @@ -294,3 +297,111 @@ func TestPubsub_Disconnect(t *testing.T) {
}
require.True(t, gotDroppedErr)
}

func TestMeasureLatency(t *testing.T) {
t.Parallel()

newPubsub := func() (pubsub.Pubsub, func()) {
ctx, cancel := context.WithCancel(context.Background())
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
connectionURL, closePg, err := dbtestutil.Open()
require.NoError(t, err)
db, err := sql.Open("postgres", connectionURL)
require.NoError(t, err)
ps, err := pubsub.New(ctx, logger, db, connectionURL)
require.NoError(t, err)

return ps, func() {
_ = ps.Close()
_ = db.Close()
closePg()
cancel()
}
}

t.Run("MeasureLatency", func(t *testing.T) {
t.Parallel()

logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
ps, done := newPubsub()
defer done()

ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitShort)
defer cancel()

send, recv, err := pubsub.NewLatencyMeasurer(logger).Measure(ctx, ps)
require.NoError(t, err)
require.Greater(t, send.Seconds(), 0.0)
require.Greater(t, recv.Seconds(), 0.0)
})

t.Run("MeasureLatencyRecvTimeout", func(t *testing.T) {
t.Parallel()

logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
ps, done := newPubsub()
defer done()

ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(-time.Hour))
defer cancel()

send, recv, err := pubsub.NewLatencyMeasurer(logger).Measure(ctx, ps)
require.ErrorContains(t, err, context.DeadlineExceeded.Error())
require.Greater(t, send.Seconds(), 0.0)
require.EqualValues(t, recv, time.Duration(-1))
})

t.Run("MeasureLatencyNotifyRace", func(t *testing.T) {
t.Parallel()

var buf bytes.Buffer
logger := slogtest.Make(t, &slogtest.Options{IgnoreErrors: true}).Leveled(slog.LevelDebug)
logger = logger.AppendSinks(sloghuman.Sink(&buf))

lm := pubsub.NewLatencyMeasurer(logger)
ps, done := newPubsub()
defer done()

racy := newRacyPubsub(ps)
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitShort)
defer cancel()

send, recv, err := lm.Measure(ctx, racy)
assert.NoError(t, err)
assert.Greater(t, send.Seconds(), 0.0)
assert.Greater(t, recv.Seconds(), 0.0)

logger.Sync()
assert.Contains(t, buf.String(), "received unexpected message")
})
}

// racyPubsub simulates a race on the same channel by publishing two messages (one expected, one not).
// This is used to verify that a subscriber will only listen for the message it explicitly expects.
type racyPubsub struct {
pubsub.Pubsub
}

func newRacyPubsub(ps pubsub.Pubsub) *racyPubsub {
return &racyPubsub{ps}
}

func (s *racyPubsub) Subscribe(event string, listener pubsub.Listener) (cancel func(), err error) {
return s.Pubsub.Subscribe(event, listener)
}

func (s *racyPubsub) SubscribeWithErr(event string, listener pubsub.ListenerWithErr) (cancel func(), err error) {
return s.Pubsub.SubscribeWithErr(event, listener)
}

func (s *racyPubsub) Publish(event string, message []byte) error {
err := s.Pubsub.Publish(event, []byte("nonsense"))
if err != nil {
return xerrors.Errorf("failed to send simulated race: %w", err)
}
return s.Pubsub.Publish(event, message)
}

func (s *racyPubsub) Close() error {
return s.Pubsub.Close()
}
39 changes: 28 additions & 11 deletions coderd/database/pubsub/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,11 @@ func TestPGPubsub_Metrics(t *testing.T) {
err = registry.Register(uut)
require.NoError(t, err)

// each Gather measures pubsub latency by publishing a message & subscribing to it
var gatherCount float64

metrics, err := registry.Gather()
gatherCount++
require.NoError(t, err)
require.True(t, testutil.PromGaugeHasValue(t, metrics, 0, "coder_pubsub_current_events"))
require.True(t, testutil.PromGaugeHasValue(t, metrics, 0, "coder_pubsub_current_subscribers"))
Expand All @@ -59,19 +63,26 @@ func TestPGPubsub_Metrics(t *testing.T) {
_ = testutil.RequireRecvCtx(ctx, t, messageChannel)

require.Eventually(t, func() bool {
latencyBytes := gatherCount * pubsub.LatencyMessageLength
metrics, err = registry.Gather()
gatherCount++
assert.NoError(t, err)
return testutil.PromGaugeHasValue(t, metrics, 1, "coder_pubsub_current_events") &&
testutil.PromGaugeHasValue(t, metrics, 1, "coder_pubsub_current_subscribers") &&
testutil.PromGaugeHasValue(t, metrics, 1, "coder_pubsub_connected") &&
testutil.PromCounterHasValue(t, metrics, 1, "coder_pubsub_publishes_total", "true") &&
testutil.PromCounterHasValue(t, metrics, 1, "coder_pubsub_subscribes_total", "true") &&
testutil.PromCounterHasValue(t, metrics, 1, "coder_pubsub_messages_total", "normal") &&
testutil.PromCounterHasValue(t, metrics, 7, "coder_pubsub_received_bytes_total") &&
testutil.PromCounterHasValue(t, metrics, 7, "coder_pubsub_published_bytes_total")
testutil.PromCounterHasValue(t, metrics, gatherCount, "coder_pubsub_publishes_total", "true") &&
testutil.PromCounterHasValue(t, metrics, gatherCount, "coder_pubsub_subscribes_total", "true") &&
testutil.PromCounterHasValue(t, metrics, gatherCount, "coder_pubsub_messages_total", "normal") &&
testutil.PromCounterHasValue(t, metrics, float64(len(data))+latencyBytes, "coder_pubsub_received_bytes_total") &&
testutil.PromCounterHasValue(t, metrics, float64(len(data))+latencyBytes, "coder_pubsub_published_bytes_total") &&
testutil.PromGaugeAssertion(t, metrics, func(in float64) bool { return in > 0 }, "coder_pubsub_send_latency_seconds") &&
testutil.PromGaugeAssertion(t, metrics, func(in float64) bool { return in > 0 }, "coder_pubsub_receive_latency_seconds") &&
testutil.PromCounterHasValue(t, metrics, gatherCount, "coder_pubsub_latency_measures_total") &&
!testutil.PromCounterGathered(t, metrics, "coder_pubsub_latency_measure_errs_total")
}, testutil.WaitShort, testutil.IntervalFast)

colossalData := make([]byte, 7600)
colossalSize := 7600
colossalData := make([]byte, colossalSize)
for i := range colossalData {
colossalData[i] = 'q'
}
Expand All @@ -89,16 +100,22 @@ func TestPGPubsub_Metrics(t *testing.T) {
_ = testutil.RequireRecvCtx(ctx, t, messageChannel)

require.Eventually(t, func() bool {
latencyBytes := gatherCount * pubsub.LatencyMessageLength
metrics, err = registry.Gather()
gatherCount++
assert.NoError(t, err)
return testutil.PromGaugeHasValue(t, metrics, 1, "coder_pubsub_current_events") &&
testutil.PromGaugeHasValue(t, metrics, 2, "coder_pubsub_current_subscribers") &&
testutil.PromGaugeHasValue(t, metrics, 1, "coder_pubsub_connected") &&
testutil.PromCounterHasValue(t, metrics, 2, "coder_pubsub_publishes_total", "true") &&
testutil.PromCounterHasValue(t, metrics, 2, "coder_pubsub_subscribes_total", "true") &&
testutil.PromCounterHasValue(t, metrics, 1, "coder_pubsub_messages_total", "normal") &&
testutil.PromCounterHasValue(t, metrics, 1+gatherCount, "coder_pubsub_publishes_total", "true") &&
testutil.PromCounterHasValue(t, metrics, 1+gatherCount, "coder_pubsub_subscribes_total", "true") &&
testutil.PromCounterHasValue(t, metrics, gatherCount, "coder_pubsub_messages_total", "normal") &&
testutil.PromCounterHasValue(t, metrics, 1, "coder_pubsub_messages_total", "colossal") &&
testutil.PromCounterHasValue(t, metrics, 7607, "coder_pubsub_received_bytes_total") &&
testutil.PromCounterHasValue(t, metrics, 7607, "coder_pubsub_published_bytes_total")
testutil.PromCounterHasValue(t, metrics, float64(colossalSize+len(data))+latencyBytes, "coder_pubsub_received_bytes_total") &&
testutil.PromCounterHasValue(t, metrics, float64(colossalSize+len(data))+latencyBytes, "coder_pubsub_published_bytes_total") &&
testutil.PromGaugeAssertion(t, metrics, func(in float64) bool { return in > 0 }, "coder_pubsub_send_latency_seconds") &&
testutil.PromGaugeAssertion(t, metrics, func(in float64) bool { return in > 0 }, "coder_pubsub_receive_latency_seconds") &&
testutil.PromCounterHasValue(t, metrics, gatherCount, "coder_pubsub_latency_measures_total") &&
!testutil.PromCounterGathered(t, metrics, "coder_pubsub_latency_measure_errs_total")
}, testutil.WaitShort, testutil.IntervalFast)
}
Loading