Skip to content

fix: fix TestPGCoordinatorDual_Mainline flake #8228

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 43 additions & 56 deletions enterprise/tailnet/pgcoord_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/goleak"
"golang.org/x/exp/slices"
"golang.org/x/xerrors"

"cdr.dev/slog"
Expand Down Expand Up @@ -203,11 +204,9 @@ func TestPGCoordinatorSingle_MissedHeartbeats(t *testing.T) {
client := newTestClient(t, coordinator, agent.id)
defer client.close()

nodes := client.recvNodes(ctx, t)
assertHasDERPs(t, nodes, 10)
assertEventuallyHasDERPs(ctx, t, client, 10)
client.sendNode(&agpl.Node{PreferredDERP: 11})
nodes = agent.recvNodes(ctx, t)
assertHasDERPs(t, nodes, 11)
assertEventuallyHasDERPs(ctx, t, agent, 11)

// simulate a second coordinator via DB calls only --- our goal is to test broken heart-beating, so we can't use a
// real coordinator
Expand All @@ -233,8 +232,7 @@ func TestPGCoordinatorSingle_MissedHeartbeats(t *testing.T) {
}()
fCoord2.heartbeat()
fCoord2.agentNode(agent.id, &agpl.Node{PreferredDERP: 12})
nodes = client.recvNodes(ctx, t)
assertHasDERPs(t, nodes, 12)
assertEventuallyHasDERPs(ctx, t, client, 12)

fCoord3 := &fakeCoordinator{
ctx: ctx,
Expand All @@ -245,24 +243,20 @@ func TestPGCoordinatorSingle_MissedHeartbeats(t *testing.T) {
start := time.Now()
fCoord3.heartbeat()
fCoord3.agentNode(agent.id, &agpl.Node{PreferredDERP: 13})
nodes = client.recvNodes(ctx, t)
assertHasDERPs(t, nodes, 13)
assertEventuallyHasDERPs(ctx, t, client, 13)

// when the fCoord3 misses enough heartbeats, the real coordinator should send an update with the
// node from fCoord2 for the agent.
nodes = client.recvNodes(ctx, t)
assertEventuallyHasDERPs(ctx, t, client, 12)
assert.Greater(t, time.Since(start), tailnet.HeartbeatPeriod*tailnet.MissedHeartbeats)
assertHasDERPs(t, nodes, 12)

// stop fCoord2 heartbeats, which should cause us to revert to the original agent mapping
cancel2()
nodes = client.recvNodes(ctx, t)
assertHasDERPs(t, nodes, 10)
assertEventuallyHasDERPs(ctx, t, client, 10)

// send fCoord3 heartbeat, which should trigger us to consider that mapping valid again.
fCoord3.heartbeat()
nodes = client.recvNodes(ctx, t)
assertHasDERPs(t, nodes, 13)
assertEventuallyHasDERPs(ctx, t, client, 13)

err = agent.close()
require.NoError(t, err)
Expand Down Expand Up @@ -358,33 +352,24 @@ func TestPGCoordinatorDual_Mainline(t *testing.T) {
defer client22.close()

client11.sendNode(&agpl.Node{PreferredDERP: 11})
nodes := agent1.recvNodes(ctx, t)
assert.Len(t, nodes, 1)
assertHasDERPs(t, nodes, 11)
assertEventuallyHasDERPs(ctx, t, agent1, 11)

client21.sendNode(&agpl.Node{PreferredDERP: 21})
nodes = agent1.recvNodes(ctx, t)
assertHasDERPs(t, nodes, 21, 11)
assertEventuallyHasDERPs(ctx, t, agent1, 21, 11)

client22.sendNode(&agpl.Node{PreferredDERP: 22})
nodes = agent2.recvNodes(ctx, t)
assertHasDERPs(t, nodes, 22)
assertEventuallyHasDERPs(ctx, t, agent2, 22)

agent2.sendNode(&agpl.Node{PreferredDERP: 2})
nodes = client22.recvNodes(ctx, t)
assertHasDERPs(t, nodes, 2)
nodes = client12.recvNodes(ctx, t)
assertHasDERPs(t, nodes, 2)
assertEventuallyHasDERPs(ctx, t, client22, 2)
assertEventuallyHasDERPs(ctx, t, client12, 2)

client12.sendNode(&agpl.Node{PreferredDERP: 12})
nodes = agent2.recvNodes(ctx, t)
assertHasDERPs(t, nodes, 12, 22)
assertEventuallyHasDERPs(ctx, t, agent2, 12, 22)

agent1.sendNode(&agpl.Node{PreferredDERP: 1})
nodes = client21.recvNodes(ctx, t)
assertHasDERPs(t, nodes, 1)
nodes = client11.recvNodes(ctx, t)
assertHasDERPs(t, nodes, 1)
assertEventuallyHasDERPs(ctx, t, client21, 1)
assertEventuallyHasDERPs(ctx, t, client11, 1)

// let's close coord2
err = coord2.Close()
Expand All @@ -402,8 +387,7 @@ func TestPGCoordinatorDual_Mainline(t *testing.T) {
// In this case the update is superfluous because client11's node hasn't changed, and agents don't deprogram clients
// from the dataplane even if they are missing. Suppressing this kind of update would require the coordinator to
// store all the data its sent to each connection, so we don't bother.
nodes = agent1.recvNodes(ctx, t)
assertHasDERPs(t, nodes, 11)
assertEventuallyHasDERPs(ctx, t, agent1, 11)

// note that although agent2 is disconnected, client12 does NOT get an update because we suppress empty updates.
// (Its easy to tell these are superfluous.)
Expand Down Expand Up @@ -492,36 +476,29 @@ func TestPGCoordinator_MultiAgent(t *testing.T) {
defer client.close()

client.sendNode(&agpl.Node{PreferredDERP: 3})
nodes := agent1.recvNodes(ctx, t)
assertHasDERPs(t, nodes, 3)
nodes = agent2.recvNodes(ctx, t)
assertHasDERPs(t, nodes, 3)
assertEventuallyHasDERPs(ctx, t, agent1, 3)
assertEventuallyHasDERPs(ctx, t, agent2, 3)

agent1.sendNode(&agpl.Node{PreferredDERP: 1})
nodes = client.recvNodes(ctx, t)
assertHasDERPs(t, nodes, 1)
assertEventuallyHasDERPs(ctx, t, client, 1)

// agent2's update overrides agent1 because it is newer
agent2.sendNode(&agpl.Node{PreferredDERP: 2})
nodes = client.recvNodes(ctx, t)
assertHasDERPs(t, nodes, 2)
assertEventuallyHasDERPs(ctx, t, client, 2)

// agent2 disconnects, and we should revert back to agent1
err = agent2.close()
require.NoError(t, err)
err = agent2.recvErr(ctx, t)
require.ErrorIs(t, err, io.ErrClosedPipe)
agent2.waitForClose(ctx, t)
nodes = client.recvNodes(ctx, t)
assertHasDERPs(t, nodes, 1)
assertEventuallyHasDERPs(ctx, t, client, 1)

agent1.sendNode(&agpl.Node{PreferredDERP: 11})
nodes = client.recvNodes(ctx, t)
assertHasDERPs(t, nodes, 11)
assertEventuallyHasDERPs(ctx, t, client, 11)

client.sendNode(&agpl.Node{PreferredDERP: 31})
nodes = agent1.recvNodes(ctx, t)
assertHasDERPs(t, nodes, 31)
assertEventuallyHasDERPs(ctx, t, agent1, 31)

err = agent1.close()
require.NoError(t, err)
Expand Down Expand Up @@ -625,17 +602,27 @@ func newTestClient(t *testing.T, coord agpl.Coordinator, agentID uuid.UUID, id .
return c
}

func assertHasDERPs(t *testing.T, nodes []*agpl.Node, expected ...int) {
if !assert.Len(t, nodes, len(expected), "expected %d node(s), got %d", len(expected), len(nodes)) {
func assertEventuallyHasDERPs(ctx context.Context, t *testing.T, c *testConn, expected ...int) {
t.Helper()
for {
nodes := c.recvNodes(ctx, t)
if len(nodes) != len(expected) {
t.Logf("expected %d, got %d nodes", len(expected), len(nodes))
continue
}

derps := make([]int, 0, len(nodes))
for _, n := range nodes {
derps = append(derps, n.PreferredDERP)
}
for _, e := range expected {
if !slices.Contains(derps, e) {
t.Logf("expected DERP %d to be in %v", e, derps)
continue
}
}
return
}
derps := make([]int, 0, len(nodes))
for _, n := range nodes {
derps = append(derps, n.PreferredDERP)
}
for _, e := range expected {
assert.Contains(t, derps, e, "expected DERP %v, got %v", e, derps)
}
}

func assertEventuallyNoAgents(ctx context.Context, t *testing.T, store database.Store, agentID uuid.UUID) {
Expand Down
7 changes: 4 additions & 3 deletions tailnet/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ func (t *TrackedConn) SendUpdates() {
return
}
if bytes.Equal(t.lastData, data) {
t.logger.Debug(t.ctx, "skipping duplicate update", slog.F("nodes", nodes))
t.logger.Debug(t.ctx, "skipping duplicate update", slog.F("nodes", string(data)))
continue
}

Expand All @@ -243,11 +243,12 @@ func (t *TrackedConn) SendUpdates() {
_, err = t.conn.Write(data)
if err != nil {
// often, this is just because the connection is closed/broken, so only log at debug.
t.logger.Debug(t.ctx, "could not write nodes to connection", slog.Error(err), slog.F("nodes", nodes))
t.logger.Debug(t.ctx, "could not write nodes to connection",
slog.Error(err), slog.F("nodes", string(data)))
_ = t.Close()
return
}
t.logger.Debug(t.ctx, "wrote nodes", slog.F("nodes", nodes))
t.logger.Debug(t.ctx, "wrote nodes", slog.F("nodes", string(data)))

// nhooyr.io/websocket has a bugged implementation of deadlines on a websocket net.Conn. What they are
// *supposed* to do is set a deadline for any subsequent writes to complete, otherwise the call to Write()
Expand Down