Skip to content

Commit ce96564

Browse files
committed
feat(enterprise): add ready for handshake support to pgcoord
1 parent e801e87 commit ce96564

File tree

14 files changed

+442
-33
lines changed

14 files changed

+442
-33
lines changed

coderd/database/db.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ func (q *sqlQuerier) InTx(function func(Store) error, txOpts *sql.TxOptions) err
103103
// Transaction succeeded.
104104
return nil
105105
}
106-
if err != nil && !IsSerializedError(err) {
106+
if !IsSerializedError(err) {
107107
// We should only retry if the error is a serialization error.
108108
return err
109109
}

coderd/database/dbauthz/dbauthz.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2645,6 +2645,13 @@ func (q *querier) ListWorkspaceAgentPortShares(ctx context.Context, workspaceID
26452645
return q.db.ListWorkspaceAgentPortShares(ctx, workspaceID)
26462646
}
26472647

2648+
func (q *querier) PublishReadyForHandshake(ctx context.Context, arg database.PublishReadyForHandshakeParams) error {
2649+
if err := q.authorizeContext(ctx, rbac.ActionRead, rbac.ResourceTailnetCoordinator); err != nil {
2650+
return err
2651+
}
2652+
return q.db.PublishReadyForHandshake(ctx, arg)
2653+
}
2654+
26482655
func (q *querier) ReduceWorkspaceAgentShareLevelToAuthenticatedByTemplate(ctx context.Context, templateID uuid.UUID) error {
26492656
template, err := q.db.GetTemplateByID(ctx, templateID)
26502657
if err != nil {

coderd/database/dbauthz/dbauthz_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1829,6 +1829,11 @@ func (s *MethodTestSuite) TestTailnetFunctions() {
18291829
Asserts(rbac.ResourceTailnetCoordinator, rbac.ActionCreate).
18301830
Errors(dbmem.ErrUnimplemented)
18311831
}))
1832+
s.Run("PublishReadyForHandshake", s.Subtest(func(db database.Store, check *expects) {
1833+
check.Args(database.PublishReadyForHandshakeParams{}).
1834+
Asserts(rbac.ResourceTailnetCoordinator, rbac.ActionUpdate).
1835+
Errors(dbmem.ErrUnimplemented)
1836+
}))
18321837
}
18331838

18341839
func (s *MethodTestSuite) TestDBCrypt() {

coderd/database/dbmem/dbmem.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6742,6 +6742,10 @@ func (q *FakeQuerier) ListWorkspaceAgentPortShares(_ context.Context, workspaceI
67426742
return shares, nil
67436743
}
67446744

6745+
func (*FakeQuerier) PublishReadyForHandshake(context.Context, database.PublishReadyForHandshakeParams) error {
6746+
return ErrUnimplemented
6747+
}
6748+
67456749
func (q *FakeQuerier) ReduceWorkspaceAgentShareLevelToAuthenticatedByTemplate(_ context.Context, templateID uuid.UUID) error {
67466750
err := validateDatabaseType(templateID)
67476751
if err != nil {

coderd/database/dbmetrics/dbmetrics.go

Lines changed: 7 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coderd/database/dbmock/dbmock.go

Lines changed: 14 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coderd/database/querier.go

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coderd/database/queries.sql.go

Lines changed: 17 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coderd/database/queries/tailnet.sql

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,12 @@ FROM tailnet_tunnels
207207
INNER JOIN tailnet_peers ON tailnet_tunnels.src_id = tailnet_peers.id
208208
WHERE tailnet_tunnels.dst_id = $1;
209209

210+
-- name: PublishReadyForHandshake :exec
211+
SELECT pg_notify(
212+
'tailnet_ready_for_handshake',
213+
format('%s,%s', sqlc.arg('to')::text, sqlc.arg('from')::text)
214+
);
215+
210216
-- For PG Coordinator HTMLDebug
211217

212218
-- name: GetAllTailnetCoordinators :many

enterprise/tailnet/connio.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ type connIO struct {
3030
responses chan<- *proto.CoordinateResponse
3131
bindings chan<- binding
3232
tunnels chan<- tunnel
33+
rfhs chan<- readyForHandshake
3334
auth agpl.CoordinateeAuth
3435
mu sync.Mutex
3536
closed bool
@@ -46,6 +47,7 @@ func newConnIO(coordContext context.Context,
4647
logger slog.Logger,
4748
bindings chan<- binding,
4849
tunnels chan<- tunnel,
50+
rfhs chan<- readyForHandshake,
4951
requests <-chan *proto.CoordinateRequest,
5052
responses chan<- *proto.CoordinateResponse,
5153
id uuid.UUID,
@@ -64,6 +66,7 @@ func newConnIO(coordContext context.Context,
6466
responses: responses,
6567
bindings: bindings,
6668
tunnels: tunnels,
69+
rfhs: rfhs,
6770
auth: auth,
6871
name: name,
6972
start: now,
@@ -190,6 +193,26 @@ func (c *connIO) handleRequest(req *proto.CoordinateRequest) error {
190193
c.disconnected = true
191194
return errDisconnect
192195
}
196+
if req.ReadyForHandshake != nil {
197+
c.logger.Debug(c.peerCtx, "got ready for handshake ", slog.F("rfh", req.ReadyForHandshake))
198+
for _, rfh := range req.ReadyForHandshake {
199+
dst, err := uuid.FromBytes(rfh.Id)
200+
if err != nil {
201+
c.logger.Error(c.peerCtx, "unable to convert bytes to UUID", slog.Error(err))
202+
// this shouldn't happen unless there is a client error. Close the connection so the client
203+
// doesn't just happily continue thinking everything is fine.
204+
return err
205+
}
206+
207+
if err := agpl.SendCtx(c.coordCtx, c.rfhs, readyForHandshake{hKey: hKey{
208+
src: c.id,
209+
dst: dst,
210+
}}); err != nil {
211+
c.logger.Debug(c.peerCtx, "failed to send ready for handshake", slog.Error(err))
212+
return err
213+
}
214+
}
215+
}
193216
return nil
194217
}
195218

0 commit comments

Comments
 (0)