Skip to content

Commit 49d2002

Browse files
committed
Reliably cause notify races by delaying publishes
Signed-off-by: Danny Kopping <danny@coder.com>
1 parent 65f57b1 commit 49d2002

File tree

2 files changed

+47
-52
lines changed

2 files changed

+47
-52
lines changed

coderd/database/pubsub/latency.go

Lines changed: 14 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -33,30 +33,24 @@ func NewLatencyMeasurer(logger slog.Logger) *LatencyMeasurer {
3333
// Measure takes a given Pubsub implementation, publishes a message & immediately receives it, and returns the observed latency.
3434
func (lm *LatencyMeasurer) Measure(ctx context.Context, p Pubsub) (send float64, recv float64, err error) {
3535
var (
36-
start time.Time
37-
res = make(chan float64, 1)
38-
subscribeErr = make(chan error, 1)
36+
start time.Time
37+
res = make(chan float64, 1)
3938
)
4039

4140
msg := []byte(uuid.New().String())
42-
log := lm.logger.With(slog.F("msg", msg))
4341

44-
go func() {
45-
_, err = p.Subscribe(lm.latencyChannelName(), func(ctx context.Context, in []byte) {
46-
p := p
47-
_ = p
48-
49-
if !bytes.Equal(in, msg) {
50-
log.Warn(ctx, "received unexpected message!", slog.F("in", in))
51-
return
52-
}
53-
54-
res <- time.Since(start).Seconds()
55-
})
56-
if err != nil {
57-
subscribeErr <- xerrors.Errorf("failed to subscribe: %w", err)
42+
cancel, err := p.Subscribe(lm.latencyChannelName(), func(ctx context.Context, in []byte) {
43+
if !bytes.Equal(in, msg) {
44+
lm.logger.Warn(ctx, "received unexpected message", slog.F("got", in), slog.F("expected", msg))
45+
return
5846
}
59-
}()
47+
48+
res <- time.Since(start).Seconds()
49+
})
50+
if err != nil {
51+
return -1, -1, xerrors.Errorf("failed to subscribe: %w", err)
52+
}
53+
defer cancel()
6054

6155
start = time.Now()
6256
err = p.Publish(lm.latencyChannelName(), msg)
@@ -68,12 +62,10 @@ func (lm *LatencyMeasurer) Measure(ctx context.Context, p Pubsub) (send float64,
6862

6963
select {
7064
case <-ctx.Done():
71-
log.Error(ctx, "context canceled before message could be received", slog.Error(ctx.Err()))
65+
lm.logger.Error(ctx, "context canceled before message could be received", slog.Error(ctx.Err()), slog.F("msg", msg))
7266
return send, -1, ctx.Err()
7367
case val := <-res:
7468
return send, val, nil
75-
case err = <-subscribeErr:
76-
return send, -1, err
7769
}
7870
}
7971

coderd/database/pubsub/pubsub_linux_test.go

Lines changed: 33 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,12 @@ import (
1313
"testing"
1414
"time"
1515

16-
"cdr.dev/slog/sloggers/sloghuman"
1716
"github.com/stretchr/testify/assert"
1817
"github.com/stretchr/testify/require"
1918
"golang.org/x/xerrors"
2019

20+
"cdr.dev/slog/sloggers/sloghuman"
21+
2122
"cdr.dev/slog"
2223
"cdr.dev/slog/sloggers/slogtest"
2324
"github.com/coder/coder/v2/coderd/database/dbtestutil"
@@ -342,8 +343,7 @@ func TestMeasureLatency(t *testing.T) {
342343
ps, done := newPubsub()
343344
defer done()
344345

345-
// nolint:gocritic // need a very short timeout here to trigger error
346-
ctx, cancel := context.WithTimeout(context.Background(), time.Nanosecond)
346+
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(-time.Second))
347347
defer cancel()
348348

349349
send, recv, err := pubsub.NewLatencyMeasurer(logger).Measure(ctx, ps)
@@ -356,75 +356,78 @@ func TestMeasureLatency(t *testing.T) {
356356
t.Parallel()
357357

358358
var buf bytes.Buffer
359-
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
359+
logger := slogtest.Make(t, &slogtest.Options{IgnoreErrors: true}).Leveled(slog.LevelDebug)
360360
logger = logger.AppendSinks(sloghuman.Sink(&buf))
361361

362362
lm := pubsub.NewLatencyMeasurer(logger)
363363
ps, done := newPubsub()
364364
defer done()
365365

366-
slow := newDelayedListener(ps, time.Second)
367-
fast := newDelayedListener(ps, time.Nanosecond)
366+
slow := newDelayedPublisher(ps, time.Second)
367+
fast := newDelayedPublisher(ps, time.Nanosecond)
368+
hold := make(chan struct{}, 1)
368369

370+
// Start two goroutines in which two subscribers are registered but the messages are received out-of-order because
371+
// the first Pubsub will publish its message slowly and the second will publish it quickly. Both will ultimately
372+
// receive their desired messages, but the slow publisher will receive an unexpected message first.
369373
var wg sync.WaitGroup
370374
wg.Add(2)
371-
372-
// Publish message concurrently to a slow receiver.
373375
go func() {
374376
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitShort)
375377
defer cancel()
376378
defer wg.Done()
377379

378-
// Slow receiver will not receive its latency message because the fast one receives it first.
379-
_, _, err := lm.Measure(ctx, slow)
380-
require.ErrorContains(t, err, context.DeadlineExceeded.Error())
380+
hold <- struct{}{}
381+
send, recv, err := lm.Measure(ctx, slow)
382+
assert.NoError(t, err)
383+
assert.Greater(t, send, 0.0)
384+
assert.Greater(t, recv, 0.0)
385+
386+
// The slow subscriber will complete first and receive the fast publisher's message first.
387+
logger.Sync()
388+
assert.Contains(t, buf.String(), "received unexpected message")
381389
}()
382390

383-
// Publish message concurrently to a fast receiver who will receive both its own and the slow receiver's messages.
384-
// It should ignore the unexpected message and consume its own, leaving the slow receiver to timeout since it
385-
// will never receive their own message.
386391
go func() {
387392
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitShort)
388393
defer cancel()
389394
defer wg.Done()
390395

396+
// Force fast publisher to start after the slow one to avoid flakiness.
397+
<-hold
398+
time.Sleep(time.Millisecond * 50)
391399
send, recv, err := lm.Measure(ctx, fast)
392-
require.NoError(t, err)
393-
require.Greater(t, send, 0.0)
394-
require.Greater(t, recv, 0.0)
400+
assert.NoError(t, err)
401+
assert.Greater(t, send, 0.0)
402+
assert.Greater(t, recv, 0.0)
395403
}()
396404

397405
wg.Wait()
398-
399-
// Flush the contents of the logger to its buffer.
400-
logger.Sync()
401-
require.Contains(t, buf.String(), "received unexpected message!")
402406
})
403407
}
404408

405-
type delayedListener struct {
409+
type delayedPublisher struct {
406410
pubsub.Pubsub
407411
delay time.Duration
408412
}
409413

410-
func newDelayedListener(ps pubsub.Pubsub, delay time.Duration) *delayedListener {
411-
return &delayedListener{Pubsub: ps, delay: delay}
414+
func newDelayedPublisher(ps pubsub.Pubsub, delay time.Duration) *delayedPublisher {
415+
return &delayedPublisher{Pubsub: ps, delay: delay}
412416
}
413417

414-
func (s *delayedListener) Subscribe(event string, listener pubsub.Listener) (cancel func(), err error) {
415-
time.Sleep(s.delay)
418+
func (s *delayedPublisher) Subscribe(event string, listener pubsub.Listener) (cancel func(), err error) {
416419
return s.Pubsub.Subscribe(event, listener)
417420
}
418421

419-
func (s *delayedListener) SubscribeWithErr(event string, listener pubsub.ListenerWithErr) (cancel func(), err error) {
420-
time.Sleep(s.delay)
422+
func (s *delayedPublisher) SubscribeWithErr(event string, listener pubsub.ListenerWithErr) (cancel func(), err error) {
421423
return s.Pubsub.SubscribeWithErr(event, listener)
422424
}
423425

424-
func (s *delayedListener) Publish(event string, message []byte) error {
426+
func (s *delayedPublisher) Publish(event string, message []byte) error {
427+
time.Sleep(s.delay)
425428
return s.Pubsub.Publish(event, message)
426429
}
427430

428-
func (s *delayedListener) Close() error {
431+
func (s *delayedPublisher) Close() error {
429432
return s.Pubsub.Close()
430433
}

0 commit comments

Comments
 (0)