Skip to content

Commit 633365d

Browse files
committed
Fake pubsub to simulate race
Signed-off-by: Danny Kopping <danny@coder.com>
1 parent a7c042f commit 633365d

File tree

1 file changed

+22
-48
lines changed

1 file changed

+22
-48
lines changed

coderd/database/pubsub/pubsub_linux_test.go

Lines changed: 22 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import (
99
"fmt"
1010
"math/rand"
1111
"strconv"
12-
"sync"
1312
"testing"
1413
"time"
1514

@@ -363,71 +362,46 @@ func TestMeasureLatency(t *testing.T) {
363362
ps, done := newPubsub()
364363
defer done()
365364

366-
slow := newDelayedPublisher(ps, time.Second)
367-
fast := newDelayedPublisher(ps, time.Nanosecond)
368-
hold := make(chan struct{}, 1)
369-
370-
// Start two goroutines in which two subscribers are registered but the messages are received out-of-order because
371-
// the first Pubsub will publish its message slowly and the second will publish it quickly. Both will ultimately
372-
// receive their desired messages, but the slow publisher will receive an unexpected message first.
373-
var wg sync.WaitGroup
374-
wg.Add(2)
375-
go func() {
376-
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitShort)
377-
defer cancel()
378-
defer wg.Done()
379-
380-
hold <- struct{}{}
381-
l := lm.Measure(ctx, slow)
382-
assert.NoError(t, l.Err)
383-
assert.Greater(t, l.Send.Seconds(), 0.0)
384-
assert.Greater(t, l.Recv.Seconds(), 0.0)
385-
386-
// The slow subscriber will complete first and receive the fast publisher's message first.
387-
logger.Sync()
388-
assert.Contains(t, buf.String(), "received unexpected message")
389-
}()
365+
racy := newRacyPubsub(ps)
366+
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitShort)
367+
defer cancel()
390368

391-
go func() {
392-
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitShort)
393-
defer cancel()
394-
defer wg.Done()
395-
396-
// Force fast publisher to start after the slow one to avoid flakiness.
397-
<-hold
398-
time.Sleep(time.Millisecond * 50)
399-
l := lm.Measure(ctx, fast)
400-
assert.NoError(t, l.Err)
401-
assert.Greater(t, l.Send.Seconds(), 0.0)
402-
assert.Greater(t, l.Recv.Seconds(), 0.0)
403-
}()
369+
l := lm.Measure(ctx, racy)
370+
assert.NoError(t, l.Err)
371+
assert.Greater(t, l.Send.Seconds(), 0.0)
372+
assert.Greater(t, l.Recv.Seconds(), 0.0)
404373

405-
wg.Wait()
374+
logger.Sync()
375+
assert.Contains(t, buf.String(), "received unexpected message")
406376
})
407377
}
408378

409-
type delayedPublisher struct {
379+
// racyPubsub simulates a race on the same channel by publishing two messages (one expected, one not).
380+
// This is used to verify that a subscriber will only listen for the message it explicitly expects.
381+
type racyPubsub struct {
410382
pubsub.Pubsub
411-
delay time.Duration
412383
}
413384

414-
func newDelayedPublisher(ps pubsub.Pubsub, delay time.Duration) *delayedPublisher {
415-
return &delayedPublisher{Pubsub: ps, delay: delay}
385+
func newRacyPubsub(ps pubsub.Pubsub) *racyPubsub {
386+
return &racyPubsub{ps}
416387
}
417388

418-
func (s *delayedPublisher) Subscribe(event string, listener pubsub.Listener) (cancel func(), err error) {
389+
func (s *racyPubsub) Subscribe(event string, listener pubsub.Listener) (cancel func(), err error) {
419390
return s.Pubsub.Subscribe(event, listener)
420391
}
421392

422-
func (s *delayedPublisher) SubscribeWithErr(event string, listener pubsub.ListenerWithErr) (cancel func(), err error) {
393+
func (s *racyPubsub) SubscribeWithErr(event string, listener pubsub.ListenerWithErr) (cancel func(), err error) {
423394
return s.Pubsub.SubscribeWithErr(event, listener)
424395
}
425396

426-
func (s *delayedPublisher) Publish(event string, message []byte) error {
427-
time.Sleep(s.delay)
397+
func (s *racyPubsub) Publish(event string, message []byte) error {
398+
err := s.Pubsub.Publish(event, []byte("nonsense"))
399+
if err != nil {
400+
return xerrors.Errorf("failed to send simulated race: %w", err)
401+
}
428402
return s.Pubsub.Publish(event, message)
429403
}
430404

431-
func (s *delayedPublisher) Close() error {
405+
func (s *racyPubsub) Close() error {
432406
return s.Pubsub.Close()
433407
}

0 commit comments

Comments
 (0)