Skip to content

fix: fix TestPendingUpdatesMetric flaky assertion #14534

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 24 additions & 5 deletions coderd/notifications/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"golang.org/x/xerrors"

"cdr.dev/slog"
"github.com/coder/quartz"

"github.com/coder/coder/v2/coderd/database"
"github.com/coder/coder/v2/coderd/notifications/dispatch"
Expand Down Expand Up @@ -54,13 +55,25 @@ type Manager struct {
stopOnce sync.Once
stop chan any
done chan any

// clock is for testing only
clock quartz.Clock
}

type ManagerOption func(*Manager)

// WithTestClock is used in testing to set the quartz clock on the manager
func WithTestClock(clock quartz.Clock) ManagerOption {
return func(m *Manager) {
m.clock = clock
}
}

// NewManager instantiates a new Manager instance which coordinates notification enqueuing and delivery.
//
// helpers is a map of template helpers which are used to customize notification messages to use global settings like
// access URL etc.
func NewManager(cfg codersdk.NotificationsConfig, store Store, helpers template.FuncMap, metrics *Metrics, log slog.Logger) (*Manager, error) {
func NewManager(cfg codersdk.NotificationsConfig, store Store, helpers template.FuncMap, metrics *Metrics, log slog.Logger, opts ...ManagerOption) (*Manager, error) {
// TODO(dannyk): add the ability to use multiple notification methods.
var method database.NotificationMethod
if err := method.Scan(cfg.Method.String()); err != nil {
Expand All @@ -74,7 +87,7 @@ func NewManager(cfg codersdk.NotificationsConfig, store Store, helpers template.
return nil, ErrInvalidDispatchTimeout
}

return &Manager{
m := &Manager{
log: log,
cfg: cfg,
store: store,
Expand All @@ -95,7 +108,13 @@ func NewManager(cfg codersdk.NotificationsConfig, store Store, helpers template.
done: make(chan any),

handlers: defaultHandlers(cfg, helpers, log),
}, nil

clock: quartz.NewReal(),
}
for _, o := range opts {
o(m)
}
return m, nil
}

// defaultHandlers builds a set of known handlers; panics if any error occurs as these handlers should be valid at compile time.
Expand Down Expand Up @@ -150,15 +169,15 @@ func (m *Manager) loop(ctx context.Context) error {
var eg errgroup.Group

// Create a notifier to run concurrently, which will handle dequeueing and dispatching notifications.
m.notifier = newNotifier(m.cfg, uuid.New(), m.log, m.store, m.handlers, m.metrics)
m.notifier = newNotifier(m.cfg, uuid.New(), m.log, m.store, m.handlers, m.metrics, m.clock)
eg.Go(func() error {
return m.notifier.run(ctx, m.success, m.failure)
})

// Periodically flush notification state changes to the store.
eg.Go(func() error {
// Every interval, collect the messages in the channels and bulk update them in the store.
tick := time.NewTicker(m.cfg.StoreSyncInterval.Value())
tick := m.clock.NewTicker(m.cfg.StoreSyncInterval.Value(), "Manager", "storeSync")
defer tick.Stop()
for {
select {
Expand Down
47 changes: 18 additions & 29 deletions coderd/notifications/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,13 +221,16 @@ func TestPendingUpdatesMetric(t *testing.T) {

// GIVEN: a notification manager whose store updates are intercepted so we can read the number of pending updates set in the metric
cfg := defaultNotificationsConfig(method)
cfg.FetchInterval = serpent.Duration(time.Millisecond * 50)
cfg.RetryInterval = serpent.Duration(time.Hour) // Delay retries so they don't interfere.
cfg.StoreSyncInterval = serpent.Duration(time.Millisecond * 100)

syncer := &syncInterceptor{Store: api.Database}
interceptor := newUpdateSignallingInterceptor(syncer)
mgr, err := notifications.NewManager(cfg, interceptor, defaultHelpers(), metrics, api.Logger.Named("manager"))
mClock := quartz.NewMock(t)
trap := mClock.Trap().NewTicker("Manager", "storeSync")
defer trap.Close()
mgr, err := notifications.NewManager(cfg, interceptor, defaultHelpers(), metrics, api.Logger.Named("manager"),
notifications.WithTestClock(mClock))
require.NoError(t, err)
t.Cleanup(func() {
assert.NoError(t, mgr.Stop(ctx))
Expand All @@ -249,6 +252,7 @@ func TestPendingUpdatesMetric(t *testing.T) {
require.NoError(t, err)

mgr.Run(ctx)
trap.MustWait(ctx).Release() // ensures ticker has been set

// THEN:
// Wait until the handler has dispatched the given notifications.
Expand All @@ -259,17 +263,20 @@ func TestPendingUpdatesMetric(t *testing.T) {
return len(handler.succeeded) == 1 && len(handler.failed) == 1
}, testutil.WaitShort, testutil.IntervalFast)

// Wait until we intercept the calls to sync the pending updates to the store.
success := testutil.RequireRecvCtx(testutil.Context(t, testutil.WaitShort), t, interceptor.updateSuccess)
failure := testutil.RequireRecvCtx(testutil.Context(t, testutil.WaitShort), t, interceptor.updateFailure)

// Wait for the metric to be updated with the expected count of metrics.
// Both handler calls should be pending in the metrics.
require.Eventually(t, func() bool {
return promtest.ToFloat64(metrics.PendingUpdates) == float64(success+failure)
return promtest.ToFloat64(metrics.PendingUpdates) == float64(2)
}, testutil.WaitShort, testutil.IntervalFast)

// Unpause the interceptor so the updates can proceed.
interceptor.unpause()
// THEN:
// Trigger syncing updates
mClock.Advance(cfg.StoreSyncInterval.Value()).MustWait(ctx)

// Wait until we intercept the calls to sync the pending updates to the store.
success := testutil.RequireRecvCtx(testutil.Context(t, testutil.WaitShort), t, interceptor.updateSuccess)
require.EqualValues(t, 1, success)
failure := testutil.RequireRecvCtx(testutil.Context(t, testutil.WaitShort), t, interceptor.updateFailure)
require.EqualValues(t, 1, failure)

// Validate that the store synced the expected number of updates.
require.Eventually(t, func() bool {
Expand Down Expand Up @@ -464,43 +471,25 @@ func fingerprintLabels(lbs ...string) model.Fingerprint {
// signaled by the caller so it can continue.
type updateSignallingInterceptor struct {
notifications.Store

pause chan any

updateSuccess chan int
updateFailure chan int
}

func newUpdateSignallingInterceptor(interceptor notifications.Store) *updateSignallingInterceptor {
return &updateSignallingInterceptor{
Store: interceptor,

pause: make(chan any, 1),

Store: interceptor,
updateSuccess: make(chan int, 1),
updateFailure: make(chan int, 1),
}
}

func (u *updateSignallingInterceptor) unpause() {
close(u.pause)
}

func (u *updateSignallingInterceptor) BulkMarkNotificationMessagesSent(ctx context.Context, arg database.BulkMarkNotificationMessagesSentParams) (int64, error) {
u.updateSuccess <- len(arg.IDs)

// Wait until signaled so we have a chance to read the number of pending updates.
<-u.pause

return u.Store.BulkMarkNotificationMessagesSent(ctx, arg)
}

func (u *updateSignallingInterceptor) BulkMarkNotificationMessagesFailed(ctx context.Context, arg database.BulkMarkNotificationMessagesFailedParams) (int64, error) {
u.updateFailure <- len(arg.IDs)

// Wait until signaled so we have a chance to read the number of pending updates.
<-u.pause

return u.Store.BulkMarkNotificationMessagesFailed(ctx, arg)
}

Expand Down
25 changes: 16 additions & 9 deletions coderd/notifications/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"encoding/json"
"sync"
"time"

"github.com/google/uuid"
"golang.org/x/sync/errgroup"
Expand All @@ -15,6 +14,7 @@ import (
"github.com/coder/coder/v2/coderd/notifications/render"
"github.com/coder/coder/v2/coderd/notifications/types"
"github.com/coder/coder/v2/codersdk"
"github.com/coder/quartz"

"cdr.dev/slog"

Expand All @@ -29,26 +29,33 @@ type notifier struct {
log slog.Logger
store Store

tick *time.Ticker
tick *quartz.Ticker
stopOnce sync.Once
quit chan any
done chan any

handlers map[database.NotificationMethod]Handler
metrics *Metrics

// clock is for testing
clock quartz.Clock
}

func newNotifier(cfg codersdk.NotificationsConfig, id uuid.UUID, log slog.Logger, db Store, hr map[database.NotificationMethod]Handler, metrics *Metrics) *notifier {
func newNotifier(cfg codersdk.NotificationsConfig, id uuid.UUID, log slog.Logger, db Store,
hr map[database.NotificationMethod]Handler, metrics *Metrics, clock quartz.Clock,
) *notifier {
tick := clock.NewTicker(cfg.FetchInterval.Value(), "notifier", "fetchInterval")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I love these ticker tags ❤️

return &notifier{
id: id,
cfg: cfg,
log: log.Named("notifier").With(slog.F("notifier_id", id)),
quit: make(chan any),
done: make(chan any),
tick: time.NewTicker(cfg.FetchInterval.Value()),
tick: tick,
store: db,
handlers: hr,
metrics: metrics,
clock: clock,
}
}

Expand Down Expand Up @@ -245,10 +252,10 @@ func (n *notifier) deliver(ctx context.Context, msg database.AcquireNotification
n.metrics.InflightDispatches.WithLabelValues(string(msg.Method), msg.TemplateID.String()).Inc()
n.metrics.QueuedSeconds.WithLabelValues(string(msg.Method)).Observe(msg.QueuedSeconds)

start := time.Now()
start := n.clock.Now()
retryable, err := deliver(ctx, msg.ID)

n.metrics.DispatcherSendSeconds.WithLabelValues(string(msg.Method)).Observe(time.Since(start).Seconds())
n.metrics.DispatcherSendSeconds.WithLabelValues(string(msg.Method)).Observe(n.clock.Since(start).Seconds())
n.metrics.InflightDispatches.WithLabelValues(string(msg.Method), msg.TemplateID.String()).Dec()

if err != nil {
Expand Down Expand Up @@ -291,7 +298,7 @@ func (n *notifier) newSuccessfulDispatch(msg database.AcquireNotificationMessage
return dispatchResult{
notifier: n.id,
msg: msg.ID,
ts: dbtime.Now(),
ts: dbtime.Time(n.clock.Now().UTC()),
}
}

Expand All @@ -311,7 +318,7 @@ func (n *notifier) newFailedDispatch(msg database.AcquireNotificationMessagesRow
return dispatchResult{
notifier: n.id,
msg: msg.ID,
ts: dbtime.Now(),
ts: dbtime.Time(n.clock.Now().UTC()),
err: err,
retryable: retryable,
}
Expand All @@ -321,7 +328,7 @@ func (n *notifier) newInhibitedDispatch(msg database.AcquireNotificationMessages
return dispatchResult{
notifier: n.id,
msg: msg.ID,
ts: dbtime.Now(),
ts: dbtime.Time(n.clock.Now().UTC()),
retryable: false,
inhibited: true,
}
Expand Down
Loading