Skip to content

Commit d26216e

Browse files
committed
add org id to provisioner domain key
1 parent eb7b156 commit d26216e

File tree

3 files changed

+34
-21
lines changed

3 files changed

+34
-21
lines changed

coderd/provisionerdserver/acquirer.go

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -89,24 +89,25 @@ func NewAcquirer(ctx context.Context, logger slog.Logger, store AcquirerStore, p
8989
// done, or the database returns an error _other_ than that no jobs are available.
9090
// If no jobs are available, this method handles retrying as appropriate.
9191
func (a *Acquirer) AcquireJob(
92-
ctx context.Context, worker uuid.UUID, pt []database.ProvisionerType, tags Tags,
92+
ctx context.Context, organization uuid.UUID, worker uuid.UUID, pt []database.ProvisionerType, tags Tags,
9393
) (
9494
retJob database.ProvisionerJob, retErr error,
9595
) {
9696
logger := a.logger.With(
97+
slog.F("organization_id", organization),
9798
slog.F("worker_id", worker),
9899
slog.F("provisioner_types", pt),
99100
slog.F("tags", tags))
100101
logger.Debug(ctx, "acquiring job")
101-
dk := domainKey(pt, tags)
102+
dk := domainKey(organization, pt, tags)
102103
dbTags, err := tags.ToJSON()
103104
if err != nil {
104105
return database.ProvisionerJob{}, err
105106
}
106107
// buffer of 1 so that cancel doesn't deadlock while writing to the channel
107108
clearance := make(chan struct{}, 1)
108109
for {
109-
a.want(pt, tags, clearance)
110+
a.want(organization, pt, tags, clearance)
110111
select {
111112
case <-ctx.Done():
112113
err := ctx.Err()
@@ -152,8 +153,8 @@ func (a *Acquirer) AcquireJob(
152153
}
153154

154155
// want signals that an acquiree wants clearance to query for a job with the given dKey.
155-
func (a *Acquirer) want(pt []database.ProvisionerType, tags Tags, clearance chan<- struct{}) {
156-
dk := domainKey(pt, tags)
156+
func (a *Acquirer) want(organization uuid.UUID, pt []database.ProvisionerType, tags Tags, clearance chan<- struct{}) {
157+
dk := domainKey(organization, pt, tags)
157158
a.mu.Lock()
158159
defer a.mu.Unlock()
159160
cleared := false
@@ -404,13 +405,16 @@ type dKey string
404405
// unprintable control character and won't show up in any "reasonable" set of
405406
// string tags, even in non-Latin scripts. It is important that Tags are
406407
// validated not to contain this control character prior to use.
407-
func domainKey(pt []database.ProvisionerType, tags Tags) dKey {
408+
func domainKey(orgID uuid.UUID, pt []database.ProvisionerType, tags Tags) dKey {
409+
sb := strings.Builder{}
410+
_, _ = sb.WriteString(orgID.String())
411+
_ = sb.WriteByte(0x00)
412+
408413
// make a copy of pt before sorting, so that we don't mutate the original
409414
// slice or underlying array.
410415
pts := make([]database.ProvisionerType, len(pt))
411416
copy(pts, pt)
412417
slices.Sort(pts)
413-
sb := strings.Builder{}
414418
for _, t := range pts {
415419
_, _ = sb.WriteString(string(t))
416420
_ = sb.WriteByte(0x00)

coderd/provisionerdserver/acquirer_test.go

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -53,12 +53,13 @@ func TestAcquirer_Single(t *testing.T) {
5353
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
5454
uut := provisionerdserver.NewAcquirer(ctx, logger.Named("acquirer"), fs, ps)
5555

56+
orgID := uuid.New()
5657
workerID := uuid.New()
5758
pt := []database.ProvisionerType{database.ProvisionerTypeEcho}
5859
tags := provisionerdserver.Tags{
5960
"environment": "on-prem",
6061
}
61-
acquiree := newTestAcquiree(t, workerID, pt, tags)
62+
acquiree := newTestAcquiree(t, orgID, workerID, pt, tags)
6263
jobID := uuid.New()
6364
err := fs.sendCtx(ctx, database.ProvisionerJob{ID: jobID}, nil)
6465
require.NoError(t, err)
@@ -82,14 +83,15 @@ func TestAcquirer_MultipleSameDomain(t *testing.T) {
8283
acquirees := make([]*testAcquiree, 0, 10)
8384
jobIDs := make(map[uuid.UUID]bool)
8485
workerIDs := make(map[uuid.UUID]bool)
86+
orgID := uuid.New()
8587
pt := []database.ProvisionerType{database.ProvisionerTypeEcho}
8688
tags := provisionerdserver.Tags{
8789
"environment": "on-prem",
8890
}
8991
for i := 0; i < 10; i++ {
9092
wID := uuid.New()
9193
workerIDs[wID] = true
92-
a := newTestAcquiree(t, wID, pt, tags)
94+
a := newTestAcquiree(t, orgID, wID, pt, tags)
9395
acquirees = append(acquirees, a)
9496
a.startAcquire(ctx, uut)
9597
}
@@ -124,12 +126,13 @@ func TestAcquirer_WaitsOnNoJobs(t *testing.T) {
124126
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
125127
uut := provisionerdserver.NewAcquirer(ctx, logger.Named("acquirer"), fs, ps)
126128

129+
orgID := uuid.New()
127130
workerID := uuid.New()
128131
pt := []database.ProvisionerType{database.ProvisionerTypeEcho}
129132
tags := provisionerdserver.Tags{
130133
"environment": "on-prem",
131134
}
132-
acquiree := newTestAcquiree(t, workerID, pt, tags)
135+
acquiree := newTestAcquiree(t, orgID, workerID, pt, tags)
133136
jobID := uuid.New()
134137
err := fs.sendCtx(ctx, database.ProvisionerJob{}, sql.ErrNoRows)
135138
require.NoError(t, err)
@@ -175,12 +178,13 @@ func TestAcquirer_RetriesPending(t *testing.T) {
175178
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
176179
uut := provisionerdserver.NewAcquirer(ctx, logger.Named("acquirer"), fs, ps)
177180

181+
orgID := uuid.New()
178182
workerID := uuid.New()
179183
pt := []database.ProvisionerType{database.ProvisionerTypeEcho}
180184
tags := provisionerdserver.Tags{
181185
"environment": "on-prem",
182186
}
183-
acquiree := newTestAcquiree(t, workerID, pt, tags)
187+
acquiree := newTestAcquiree(t, orgID, workerID, pt, tags)
184188
jobID := uuid.New()
185189

186190
acquiree.startAcquire(ctx, uut)
@@ -217,17 +221,18 @@ func TestAcquirer_DifferentDomains(t *testing.T) {
217221
defer cancel()
218222
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
219223

224+
orgID := uuid.New()
220225
pt := []database.ProvisionerType{database.ProvisionerTypeEcho}
221226
worker0 := uuid.New()
222227
tags0 := provisionerdserver.Tags{
223228
"worker": "0",
224229
}
225-
acquiree0 := newTestAcquiree(t, worker0, pt, tags0)
230+
acquiree0 := newTestAcquiree(t, orgID, worker0, pt, tags0)
226231
worker1 := uuid.New()
227232
tags1 := provisionerdserver.Tags{
228233
"worker": "1",
229234
}
230-
acquiree1 := newTestAcquiree(t, worker1, pt, tags1)
235+
acquiree1 := newTestAcquiree(t, orgID, worker1, pt, tags1)
231236
jobID := uuid.New()
232237
fs.jobs = []database.ProvisionerJob{
233238
{ID: jobID, Provisioner: database.ProvisionerTypeEcho, Tags: database.StringMap{"worker": "1"}},
@@ -268,11 +273,12 @@ func TestAcquirer_BackupPoll(t *testing.T) {
268273
)
269274

270275
workerID := uuid.New()
276+
orgID := uuid.New()
271277
pt := []database.ProvisionerType{database.ProvisionerTypeEcho}
272278
tags := provisionerdserver.Tags{
273279
"environment": "on-prem",
274280
}
275-
acquiree := newTestAcquiree(t, workerID, pt, tags)
281+
acquiree := newTestAcquiree(t, orgID, workerID, pt, tags)
276282
jobID := uuid.New()
277283
err := fs.sendCtx(ctx, database.ProvisionerJob{}, sql.ErrNoRows)
278284
require.NoError(t, err)
@@ -294,13 +300,14 @@ func TestAcquirer_UnblockOnCancel(t *testing.T) {
294300
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
295301

296302
pt := []database.ProvisionerType{database.ProvisionerTypeEcho}
303+
orgID := uuid.New()
297304
worker0 := uuid.New()
298305
tags := provisionerdserver.Tags{
299306
"environment": "on-prem",
300307
}
301-
acquiree0 := newTestAcquiree(t, worker0, pt, tags)
308+
acquiree0 := newTestAcquiree(t, orgID, worker0, pt, tags)
302309
worker1 := uuid.New()
303-
acquiree1 := newTestAcquiree(t, worker1, pt, tags)
310+
acquiree1 := newTestAcquiree(t, orgID, worker1, pt, tags)
304311
jobID := uuid.New()
305312

306313
uut := provisionerdserver.NewAcquirer(ctx, logger.Named("acquirer"), fs, ps)
@@ -486,7 +493,7 @@ func TestAcquirer_MatchTags(t *testing.T) {
486493
require.NoError(t, err)
487494
ptypes := []database.ProvisionerType{database.ProvisionerTypeEcho}
488495
acq := provisionerdserver.NewAcquirer(ctx, log, db, ps)
489-
aj, err := acq.AcquireJob(ctx, uuid.New(), ptypes, tt.acquireJobTags)
496+
aj, err := acq.AcquireJob(ctx, org.ID, uuid.New(), ptypes, tt.acquireJobTags)
490497
if tt.expectAcquire {
491498
assert.NoError(t, err)
492499
assert.Equal(t, pj.ID, aj.ID)
@@ -659,16 +666,18 @@ jobLoop:
659666
// and asserting whether or not it returns, blocks, or is canceled.
660667
type testAcquiree struct {
661668
t *testing.T
669+
orgID uuid.UUID
662670
workerID uuid.UUID
663671
pt []database.ProvisionerType
664672
tags provisionerdserver.Tags
665673
ec chan error
666674
jc chan database.ProvisionerJob
667675
}
668676

669-
func newTestAcquiree(t *testing.T, workerID uuid.UUID, pt []database.ProvisionerType, tags provisionerdserver.Tags) *testAcquiree {
677+
func newTestAcquiree(t *testing.T, orgID uuid.UUID, workerID uuid.UUID, pt []database.ProvisionerType, tags provisionerdserver.Tags) *testAcquiree {
670678
return &testAcquiree{
671679
t: t,
680+
orgID: orgID,
672681
workerID: workerID,
673682
pt: pt,
674683
tags: tags,
@@ -679,7 +688,7 @@ func newTestAcquiree(t *testing.T, workerID uuid.UUID, pt []database.Provisioner
679688

680689
func (a *testAcquiree) startAcquire(ctx context.Context, uut *provisionerdserver.Acquirer) {
681690
go func() {
682-
j, e := uut.AcquireJob(ctx, a.workerID, a.pt, a.tags)
691+
j, e := uut.AcquireJob(ctx, a.orgID, a.workerID, a.pt, a.tags)
683692
a.ec <- e
684693
a.jc <- j
685694
}()

coderd/provisionerdserver/provisionerdserver.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -290,7 +290,7 @@ func (s *server) AcquireJob(ctx context.Context, _ *proto.Empty) (*proto.Acquire
290290
// database.
291291
acqCtx, acqCancel := context.WithTimeout(ctx, s.acquireJobLongPollDur)
292292
defer acqCancel()
293-
job, err := s.Acquirer.AcquireJob(acqCtx, s.ID, s.Provisioners, s.Tags)
293+
job, err := s.Acquirer.AcquireJob(acqCtx, s.OrganizationID, s.ID, s.Provisioners, s.Tags)
294294
if xerrors.Is(err, context.DeadlineExceeded) {
295295
s.Logger.Debug(ctx, "successful cancel")
296296
return &proto.AcquiredJob{}, nil
@@ -327,7 +327,7 @@ func (s *server) AcquireJobWithCancel(stream proto.DRPCProvisionerDaemon_Acquire
327327
}()
328328
jec := make(chan jobAndErr, 1)
329329
go func() {
330-
job, err := s.Acquirer.AcquireJob(acqCtx, s.ID, s.Provisioners, s.Tags)
330+
job, err := s.Acquirer.AcquireJob(acqCtx, s.OrganizationID, s.ID, s.Provisioners, s.Tags)
331331
jec <- jobAndErr{job: job, err: err}
332332
}()
333333
var recvErr error

0 commit comments

Comments
 (0)