Skip to content

Commit e324a42

Browse files
committed
Implement observability of notification subsystem
Minor refactoring to make testing easier Signed-off-by: Danny Kopping <danny@coder.com>
1 parent bc4125b commit e324a42

17 files changed

+634
-137
lines changed

cli/server.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -983,6 +983,7 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd.
983983
)
984984
if experiments.Enabled(codersdk.ExperimentNotifications) {
985985
cfg := options.DeploymentValues.Notifications
986+
metrics := notifications.NewMetrics(options.PrometheusRegistry)
986987

987988
// The enqueuer is responsible for enqueueing notifications to the given store.
988989
enqueuer, err := notifications.NewStoreEnqueuer(cfg, options.Database, templateHelpers(options), logger.Named("notifications.enqueuer"))
@@ -994,7 +995,7 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd.
994995
// The notification manager is responsible for:
995996
// - creating notifiers and managing their lifecycles (notifiers are responsible for dequeueing/sending notifications)
996997
// - keeping the store updated with status updates
997-
notificationsManager, err = notifications.NewManager(cfg, options.Database, logger.Named("notifications.manager"))
998+
notificationsManager, err = notifications.NewManager(cfg, options.Database, metrics, logger.Named("notifications.manager"))
998999
if err != nil {
9991000
return xerrors.Errorf("failed to instantiate notification manager: %w", err)
10001001
}

coderd/database/dbmem/dbmem.go

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -929,15 +929,20 @@ func (q *FakeQuerier) AcquireNotificationMessages(_ context.Context, arg databas
929929
return nil, err
930930
}
931931

932+
// Shift the first "Count" notifications off the slice (FIFO).
933+
sz := len(q.notificationMessages)
934+
if sz > int(arg.Count) {
935+
sz = int(arg.Count)
936+
}
937+
938+
list := q.notificationMessages[:sz]
939+
q.notificationMessages = q.notificationMessages[sz:]
940+
932941
q.mutex.Lock()
933942
defer q.mutex.Unlock()
934943

935944
var out []database.AcquireNotificationMessagesRow
936-
for _, nm := range q.notificationMessages {
937-
if len(out) >= int(arg.Count) {
938-
break
939-
}
940-
945+
for _, nm := range list {
941946
acquirableStatuses := []database.NotificationMessageStatus{database.NotificationMessageStatusPending, database.NotificationMessageStatusTemporaryFailure}
942947
if !slices.Contains(acquirableStatuses, nm.Status) {
943948
continue
@@ -953,9 +958,9 @@ func (q *FakeQuerier) AcquireNotificationMessages(_ context.Context, arg databas
953958
ID: nm.ID,
954959
Payload: nm.Payload,
955960
Method: nm.Method,
956-
CreatedBy: nm.CreatedBy,
957961
TitleTemplate: "This is a title with {{.Labels.variable}}",
958962
BodyTemplate: "This is a body with {{.Labels.variable}}",
963+
TemplateID: nm.NotificationTemplateID,
959964
})
960965
}
961966

@@ -1229,15 +1234,15 @@ func (*FakeQuerier) BulkMarkNotificationMessagesFailed(_ context.Context, arg da
12291234
if err != nil {
12301235
return 0, err
12311236
}
1232-
return -1, nil
1237+
return int64(len(arg.IDs)), nil
12331238
}
12341239

12351240
func (*FakeQuerier) BulkMarkNotificationMessagesSent(_ context.Context, arg database.BulkMarkNotificationMessagesSentParams) (int64, error) {
12361241
err := validateDatabaseType(arg)
12371242
if err != nil {
12381243
return 0, err
12391244
}
1240-
return -1, nil
1245+
return int64(len(arg.IDs)), nil
12411246
}
12421247

12431248
func (*FakeQuerier) CleanTailnetCoordinators(_ context.Context) error {

coderd/database/dump.sql

Lines changed: 2 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
ALTER TABLE notification_messages
2+
DROP COLUMN IF EXISTS queued_seconds;
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
ALTER TABLE notification_messages
2+
ADD COLUMN queued_seconds FLOAT NULL;

coderd/database/models.go

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coderd/database/queries.sql.go

Lines changed: 15 additions & 7 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coderd/database/queries/notifications.sql

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,8 @@ RETURNING *;
3636
WITH acquired AS (
3737
UPDATE
3838
notification_messages
39-
SET updated_at = NOW(),
39+
SET queued_seconds = GREATEST(0, EXTRACT(EPOCH FROM (NOW() - updated_at)))::FLOAT,
40+
updated_at = NOW(),
4041
status = 'leased'::notification_message_status,
4142
status_reason = 'Leased by notifier ' || sqlc.arg('notifier_id')::uuid,
4243
leased_until = NOW() + CONCAT(sqlc.arg('lease_seconds')::int, ' seconds')::interval
@@ -78,8 +79,10 @@ SELECT
7879
nm.id,
7980
nm.payload,
8081
nm.method,
81-
nm.created_by,
82+
nm.attempt_count::int AS attempt_count,
83+
nm.queued_seconds::float AS queued_seconds,
8284
-- template
85+
nt.id AS template_id,
8386
nt.title_template,
8487
nt.body_template
8588
FROM acquired nm
@@ -111,7 +114,7 @@ SET updated_at = new_values.sent_at,
111114
status_reason = NULL,
112115
leased_until = NULL,
113116
next_retry_after = NULL
114-
FROM (SELECT UNNEST(@ids::uuid[]) AS id,
117+
FROM (SELECT UNNEST(@ids::uuid[]) AS id,
115118
UNNEST(@sent_ats::timestamptz[]) AS sent_at)
116119
AS new_values
117120
WHERE notification_messages.id = new_values.id;

coderd/notifications/manager.go

Lines changed: 25 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,9 @@ type Manager struct {
4343

4444
notifier *notifier
4545
handlers map[database.NotificationMethod]Handler
46+
method database.NotificationMethod
47+
48+
metrics *Metrics
4649

4750
success, failure chan dispatchResult
4851

@@ -56,7 +59,16 @@ type Manager struct {
5659
//
5760
// helpers is a map of template helpers which are used to customize notification messages to use global settings like
5861
// access URL etc.
59-
func NewManager(cfg codersdk.NotificationsConfig, store Store, log slog.Logger) (*Manager, error) {
62+
func NewManager(cfg codersdk.NotificationsConfig, store Store, metrics *Metrics, log slog.Logger) (*Manager, error) {
63+
if metrics == nil {
64+
panic("nil metrics passed to notifications manager")
65+
}
66+
67+
method, err := dispatchMethodFromCfg(cfg)
68+
if err != nil {
69+
return nil, err
70+
}
71+
6072
// If dispatch timeout exceeds lease period, it is possible that messages can be delivered in duplicate because the
6173
// lease can expire before the notifier gives up on the dispatch, which results in the message becoming eligible for
6274
// being re-acquired.
@@ -78,6 +90,9 @@ func NewManager(cfg codersdk.NotificationsConfig, store Store, log slog.Logger)
7890
success: make(chan dispatchResult, cfg.StoreSyncBufferSize),
7991
failure: make(chan dispatchResult, cfg.StoreSyncBufferSize),
8092

93+
metrics: metrics,
94+
method: method,
95+
8196
stop: make(chan any),
8297
done: make(chan any),
8398

@@ -137,7 +152,7 @@ func (m *Manager) loop(ctx context.Context) error {
137152
var eg errgroup.Group
138153

139154
// Create a notifier to run concurrently, which will handle dequeueing and dispatching notifications.
140-
m.notifier = newNotifier(m.cfg, uuid.New(), m.log, m.store, m.handlers)
155+
m.notifier = newNotifier(m.cfg, uuid.New(), m.log, m.store, m.handlers, m.method, m.metrics)
141156
eg.Go(func() error {
142157
return m.notifier.run(ctx, m.success, m.failure)
143158
})
@@ -171,12 +186,12 @@ func (m *Manager) loop(ctx context.Context) error {
171186
if len(m.success)+len(m.failure) > 0 {
172187
m.log.Warn(ctx, "flushing buffered updates before stop",
173188
slog.F("success_count", len(m.success)), slog.F("failure_count", len(m.failure)))
174-
m.bulkUpdate(ctx)
189+
m.syncUpdates(ctx)
175190
m.log.Warn(ctx, "flushing updates done")
176191
}
177192
return nil
178193
case <-tick.C:
179-
m.bulkUpdate(ctx)
194+
m.syncUpdates(ctx)
180195
}
181196
}
182197
})
@@ -194,8 +209,8 @@ func (m *Manager) BufferedUpdatesCount() (success int, failure int) {
194209
return len(m.success), len(m.failure)
195210
}
196211

197-
// bulkUpdate updates messages in the store based on the given successful and failed message dispatch results.
198-
func (m *Manager) bulkUpdate(ctx context.Context) {
212+
// syncUpdates updates messages in the store based on the given successful and failed message dispatch results.
213+
func (m *Manager) syncUpdates(ctx context.Context) {
199214
select {
200215
case <-ctx.Done():
201216
return
@@ -205,6 +220,10 @@ func (m *Manager) bulkUpdate(ctx context.Context) {
205220
nSuccess := len(m.success)
206221
nFailure := len(m.failure)
207222

223+
defer func() {
224+
m.metrics.PendingUpdates.Set(float64(len(m.success) + len(m.failure)))
225+
}()
226+
208227
// Nothing to do.
209228
if nSuccess+nFailure == 0 {
210229
return
@@ -347,21 +366,3 @@ type dispatchResult struct {
347366
err error
348367
retryable bool
349368
}
350-
351-
func newSuccessfulDispatch(notifier, msg uuid.UUID) dispatchResult {
352-
return dispatchResult{
353-
notifier: notifier,
354-
msg: msg,
355-
ts: time.Now(),
356-
}
357-
}
358-
359-
func newFailedDispatch(notifier, msg uuid.UUID, err error, retryable bool) dispatchResult {
360-
return dispatchResult{
361-
notifier: notifier,
362-
msg: msg,
363-
ts: time.Now(),
364-
err: err,
365-
retryable: retryable,
366-
}
367-
}

coderd/notifications/manager_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,13 +35,13 @@ func TestBufferedUpdates(t *testing.T) {
3535
}
3636

3737
ctx, logger, db := setup(t)
38-
interceptor := &bulkUpdateInterceptor{Store: db}
38+
interceptor := &syncInterceptor{Store: db}
3939
santa := &santaHandler{}
4040

4141
cfg := defaultNotificationsConfig(database.NotificationMethodSmtp)
4242
cfg.StoreSyncInterval = serpent.Duration(time.Hour) // Ensure we don't sync the store automatically.
4343

44-
mgr, err := notifications.NewManager(cfg, interceptor, logger.Named("notifications-manager"))
44+
mgr, err := notifications.NewManager(cfg, interceptor, createMetrics(), logger.Named("notifications-manager"))
4545
require.NoError(t, err)
4646
mgr.WithHandlers(map[database.NotificationMethod]notifications.Handler{
4747
database.NotificationMethodSmtp: santa,
@@ -153,7 +153,7 @@ func TestStopBeforeRun(t *testing.T) {
153153

154154
ctx := context.Background()
155155
logger := slogtest.Make(t, &slogtest.Options{IgnoreErrors: true, IgnoredErrorIs: []error{}}).Leveled(slog.LevelDebug)
156-
mgr, err := notifications.NewManager(defaultNotificationsConfig(database.NotificationMethodSmtp), dbmem.New(), logger.Named("notifications-manager"))
156+
mgr, err := notifications.NewManager(defaultNotificationsConfig(database.NotificationMethodSmtp), dbmem.New(), createMetrics(), logger.Named("notifications-manager"))
157157
require.NoError(t, err)
158158

159159
// Call stop before notifier is started with Run().
@@ -163,15 +163,15 @@ func TestStopBeforeRun(t *testing.T) {
163163
}, testutil.WaitShort, testutil.IntervalFast)
164164
}
165165

166-
type bulkUpdateInterceptor struct {
166+
type syncInterceptor struct {
167167
notifications.Store
168168

169169
sent atomic.Int32
170170
failed atomic.Int32
171171
err atomic.Value
172172
}
173173

174-
func (b *bulkUpdateInterceptor) BulkMarkNotificationMessagesSent(ctx context.Context, arg database.BulkMarkNotificationMessagesSentParams) (int64, error) {
174+
func (b *syncInterceptor) BulkMarkNotificationMessagesSent(ctx context.Context, arg database.BulkMarkNotificationMessagesSentParams) (int64, error) {
175175
updated, err := b.Store.BulkMarkNotificationMessagesSent(ctx, arg)
176176
b.sent.Add(int32(updated))
177177
if err != nil {
@@ -180,7 +180,7 @@ func (b *bulkUpdateInterceptor) BulkMarkNotificationMessagesSent(ctx context.Con
180180
return updated, err
181181
}
182182

183-
func (b *bulkUpdateInterceptor) BulkMarkNotificationMessagesFailed(ctx context.Context, arg database.BulkMarkNotificationMessagesFailedParams) (int64, error) {
183+
func (b *syncInterceptor) BulkMarkNotificationMessagesFailed(ctx context.Context, arg database.BulkMarkNotificationMessagesFailedParams) (int64, error) {
184184
updated, err := b.Store.BulkMarkNotificationMessagesFailed(ctx, arg)
185185
b.failed.Add(int32(updated))
186186
if err != nil {

0 commit comments

Comments
 (0)