|
9 | 9 | "fmt"
|
10 | 10 | "math/rand"
|
11 | 11 | "strconv"
|
12 |
| - "sync" |
13 | 12 | "testing"
|
14 | 13 | "time"
|
15 | 14 |
|
@@ -363,71 +362,46 @@ func TestMeasureLatency(t *testing.T) {
|
363 | 362 | ps, done := newPubsub()
|
364 | 363 | defer done()
|
365 | 364 |
|
366 |
| - slow := newDelayedPublisher(ps, time.Second) |
367 |
| - fast := newDelayedPublisher(ps, time.Nanosecond) |
368 |
| - hold := make(chan struct{}, 1) |
369 |
| - |
370 |
| - // Start two goroutines in which two subscribers are registered but the messages are received out-of-order because |
371 |
| - // the first Pubsub will publish its message slowly and the second will publish it quickly. Both will ultimately |
372 |
| - // receive their desired messages, but the slow publisher will receive an unexpected message first. |
373 |
| - var wg sync.WaitGroup |
374 |
| - wg.Add(2) |
375 |
| - go func() { |
376 |
| - ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitShort) |
377 |
| - defer cancel() |
378 |
| - defer wg.Done() |
379 |
| - |
380 |
| - hold <- struct{}{} |
381 |
| - l := lm.Measure(ctx, slow) |
382 |
| - assert.NoError(t, l.Err) |
383 |
| - assert.Greater(t, l.Send.Seconds(), 0.0) |
384 |
| - assert.Greater(t, l.Recv.Seconds(), 0.0) |
385 |
| - |
386 |
| - // The slow subscriber will complete first and receive the fast publisher's message first. |
387 |
| - logger.Sync() |
388 |
| - assert.Contains(t, buf.String(), "received unexpected message") |
389 |
| - }() |
| 365 | + racy := newRacyPubsub(ps) |
| 366 | + ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitShort) |
| 367 | + defer cancel() |
390 | 368 |
|
391 |
| - go func() { |
392 |
| - ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitShort) |
393 |
| - defer cancel() |
394 |
| - defer wg.Done() |
395 |
| - |
396 |
| - // Force fast publisher to start after the slow one to avoid flakiness. |
397 |
| - <-hold |
398 |
| - time.Sleep(time.Millisecond * 50) |
399 |
| - l := lm.Measure(ctx, fast) |
400 |
| - assert.NoError(t, l.Err) |
401 |
| - assert.Greater(t, l.Send.Seconds(), 0.0) |
402 |
| - assert.Greater(t, l.Recv.Seconds(), 0.0) |
403 |
| - }() |
| 369 | + l := lm.Measure(ctx, racy) |
| 370 | + assert.NoError(t, l.Err) |
| 371 | + assert.Greater(t, l.Send.Seconds(), 0.0) |
| 372 | + assert.Greater(t, l.Recv.Seconds(), 0.0) |
404 | 373 |
|
405 |
| - wg.Wait() |
| 374 | + logger.Sync() |
| 375 | + assert.Contains(t, buf.String(), "received unexpected message") |
406 | 376 | })
|
407 | 377 | }
|
408 | 378 |
|
409 |
| -type delayedPublisher struct { |
| 379 | +// racyPubsub simulates a race on the same channel by publishing two messages (one expected, one not). |
| 380 | +// This is used to verify that a subscriber will only listen for the message it explicitly expects. |
| 381 | +type racyPubsub struct { |
410 | 382 | pubsub.Pubsub
|
411 |
| - delay time.Duration |
412 | 383 | }
|
413 | 384 |
|
414 |
| -func newDelayedPublisher(ps pubsub.Pubsub, delay time.Duration) *delayedPublisher { |
415 |
| - return &delayedPublisher{Pubsub: ps, delay: delay} |
| 385 | +func newRacyPubsub(ps pubsub.Pubsub) *racyPubsub { |
| 386 | + return &racyPubsub{ps} |
416 | 387 | }
|
417 | 388 |
|
418 |
| -func (s *delayedPublisher) Subscribe(event string, listener pubsub.Listener) (cancel func(), err error) { |
| 389 | +func (s *racyPubsub) Subscribe(event string, listener pubsub.Listener) (cancel func(), err error) { |
419 | 390 | return s.Pubsub.Subscribe(event, listener)
|
420 | 391 | }
|
421 | 392 |
|
422 |
| -func (s *delayedPublisher) SubscribeWithErr(event string, listener pubsub.ListenerWithErr) (cancel func(), err error) { |
| 393 | +func (s *racyPubsub) SubscribeWithErr(event string, listener pubsub.ListenerWithErr) (cancel func(), err error) { |
423 | 394 | return s.Pubsub.SubscribeWithErr(event, listener)
|
424 | 395 | }
|
425 | 396 |
|
426 |
| -func (s *delayedPublisher) Publish(event string, message []byte) error { |
427 |
| - time.Sleep(s.delay) |
| 397 | +func (s *racyPubsub) Publish(event string, message []byte) error { |
| 398 | + err := s.Pubsub.Publish(event, []byte("nonsense")) |
| 399 | + if err != nil { |
| 400 | + return xerrors.Errorf("failed to send simulated race: %w", err) |
| 401 | + } |
428 | 402 | return s.Pubsub.Publish(event, message)
|
429 | 403 | }
|
430 | 404 |
|
431 |
| -func (s *delayedPublisher) Close() error { |
| 405 | +func (s *racyPubsub) Close() error { |
432 | 406 | return s.Pubsub.Close()
|
433 | 407 | }
|
0 commit comments