Skip to content

Commit 764b9b2

Browse files
committed
feat(scaletest): replace bash with dd in ssh/rpty traffic and use pseudorandomness
Fixes #10795 Refs #8556
1 parent 4f92928 commit 764b9b2

File tree

6 files changed

+324
-145
lines changed

6 files changed

+324
-145
lines changed

cli/exp_scaletest.go

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"math/rand"
1111
"net/http"
1212
"os"
13+
"os/signal"
1314
"strconv"
1415
"strings"
1516
"sync"
@@ -173,11 +174,12 @@ func (s *scaletestStrategyFlags) attach(opts *clibase.OptionSet) {
173174

174175
func (s *scaletestStrategyFlags) toStrategy() harness.ExecutionStrategy {
175176
var strategy harness.ExecutionStrategy
176-
if s.concurrency == 1 {
177+
switch s.concurrency {
178+
case 1:
177179
strategy = harness.LinearExecutionStrategy{}
178-
} else if s.concurrency == 0 {
180+
case 0:
179181
strategy = harness.ConcurrentExecutionStrategy{}
180-
} else {
182+
default:
181183
strategy = harness.ParallelExecutionStrategy{
182184
Limit: int(s.concurrency),
183185
}
@@ -244,7 +246,9 @@ func (o *scaleTestOutput) write(res harness.Results, stdout io.Writer) error {
244246
err := s.Sync()
245247
// On Linux, EINVAL is returned when calling fsync on /dev/stdout. We
246248
// can safely ignore this error.
247-
if err != nil && !xerrors.Is(err, syscall.EINVAL) {
249+
// On macOS, ENOTTY is returned when calling sync on /dev/stdout. We
250+
// can safely ignore this error.
251+
if err != nil && !xerrors.Is(err, syscall.EINVAL) && !xerrors.Is(err, syscall.ENOTTY) {
248252
return xerrors.Errorf("flush output file: %w", err)
249253
}
250254
}
@@ -871,9 +875,13 @@ func (r *RootCmd) scaletestWorkspaceTraffic() *clibase.Cmd {
871875
Middleware: clibase.Chain(
872876
r.InitClient(client),
873877
),
874-
Handler: func(inv *clibase.Invocation) error {
878+
Handler: func(inv *clibase.Invocation) (err error) {
875879
ctx := inv.Context()
876880

881+
notifyCtx, stop := signal.NotifyContext(ctx, InterruptSignals...) // Checked later.
882+
defer stop()
883+
ctx = notifyCtx
884+
877885
me, err := requireAdmin(ctx, client)
878886
if err != nil {
879887
return err
@@ -965,6 +973,7 @@ func (r *RootCmd) scaletestWorkspaceTraffic() *clibase.Cmd {
965973
ReadMetrics: metrics.ReadMetrics(ws.OwnerName, ws.Name, agentName),
966974
WriteMetrics: metrics.WriteMetrics(ws.OwnerName, ws.Name, agentName),
967975
SSH: ssh,
976+
Echo: ssh,
968977
}
969978

970979
if err := config.Validate(); err != nil {
@@ -990,6 +999,11 @@ func (r *RootCmd) scaletestWorkspaceTraffic() *clibase.Cmd {
990999
return xerrors.Errorf("run test harness (harness failure, not a test failure): %w", err)
9911000
}
9921001

1002+
// If the command was interrupted, skip stats.
1003+
if notifyCtx.Err() != nil {
1004+
return notifyCtx.Err()
1005+
}
1006+
9931007
res := th.Results()
9941008
for _, o := range outputs {
9951009
err = o.write(res, inv.Stdout)

scaletest/workspacetraffic/config.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,12 @@ type Config struct {
2525
WriteMetrics ConnMetrics `json:"-"`
2626

2727
SSH bool `json:"ssh"`
28+
29+
// Echo controls whether the agent should echo the data it receives.
30+
// If false, the agent will discard the data. Note that setting this
31+
// to true will double the amount of data read from the agent for
32+
// PTYs (e.g. reconnecting pty or SSH connections that request PTY).
33+
Echo bool `json:"echo"`
2834
}
2935

3036
func (c Config) Validate() error {

scaletest/workspacetraffic/conn.go

Lines changed: 181 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -2,122 +2,260 @@ package workspacetraffic
22

33
import (
44
"context"
5+
"encoding/json"
6+
"errors"
57
"io"
68
"sync"
9+
"time"
710

811
"github.com/coder/coder/v2/codersdk"
912

1013
"github.com/google/uuid"
11-
"github.com/hashicorp/go-multierror"
1214
gossh "golang.org/x/crypto/ssh"
1315
"golang.org/x/xerrors"
1416
)
1517

16-
func connectPTY(ctx context.Context, client *codersdk.Client, agentID, reconnect uuid.UUID) (*countReadWriteCloser, error) {
18+
const (
19+
// Set a timeout for graceful close of the connection.
20+
connCloseTimeout = 30 * time.Second
21+
// Set a timeout for waiting for the connection to close.
22+
waitCloseTimeout = connCloseTimeout + 5*time.Second
23+
24+
// In theory, we can send larger payloads to push bandwidth, but we need to
25+
// be careful not to send too much data at once or the server will close the
26+
// connection. We see this more readily as our JSON payloads approach 28KB.
27+
//
28+
// failed to write frame: WebSocket closed: received close frame: status = StatusMessageTooBig and reason = "read limited at 32769 bytes"
29+
//
30+
// Since we can't control fragmentation/buffer sizes, we keep it simple and
31+
// match the conservative payload size used by agent/reconnectingpty (1024).
32+
rptyJSONMaxDataSize = 1024
33+
)
34+
35+
func connectRPTY(ctx context.Context, client *codersdk.Client, agentID, reconnect uuid.UUID, cmd string) (*countReadWriteCloser, error) {
36+
width, height := 80, 25
1737
conn, err := client.WorkspaceAgentReconnectingPTY(ctx, codersdk.WorkspaceAgentReconnectingPTYOpts{
1838
AgentID: agentID,
1939
Reconnect: reconnect,
20-
Height: 25,
21-
Width: 80,
22-
Command: "sh",
40+
Width: uint16(width),
41+
Height: uint16(height),
42+
Command: cmd,
2343
})
2444
if err != nil {
2545
return nil, xerrors.Errorf("connect pty: %w", err)
2646
}
2747

2848
// Wrap the conn in a countReadWriteCloser so we can monitor bytes sent/rcvd.
29-
crw := countReadWriteCloser{ctx: ctx, rwc: conn}
49+
crw := countReadWriteCloser{rwc: newPTYConn(conn)}
3050
return &crw, nil
3151
}
3252

33-
func connectSSH(ctx context.Context, client *codersdk.Client, agentID uuid.UUID) (*countReadWriteCloser, error) {
53+
type rptyConn struct {
54+
conn io.ReadWriteCloser
55+
wenc *json.Encoder
56+
57+
mu sync.Mutex
58+
closed bool
59+
readErr chan error
60+
readOnce sync.Once
61+
}
62+
63+
func newPTYConn(conn io.ReadWriteCloser) *rptyConn {
64+
rc := &rptyConn{
65+
conn: conn,
66+
wenc: json.NewEncoder(conn),
67+
readErr: make(chan error, 1),
68+
}
69+
return rc
70+
}
71+
72+
func (c *rptyConn) Read(p []byte) (int, error) {
73+
n, err := c.conn.Read(p)
74+
if err != nil {
75+
c.readOnce.Do(func() {
76+
c.readErr <- err
77+
close(c.readErr)
78+
})
79+
return n, err
80+
}
81+
return n, nil
82+
}
83+
84+
func (c *rptyConn) Write(p []byte) (int, error) {
85+
c.mu.Lock()
86+
defer c.mu.Unlock()
87+
88+
// Early exit in case we're closing, this is to let call write Ctrl+C
89+
// without a flood of other writes.
90+
if c.closed {
91+
return 0, io.EOF
92+
}
93+
94+
return c.writeNoLock(p)
95+
}
96+
97+
func (c *rptyConn) writeNoLock(p []byte) (n int, err error) {
98+
// If we try to send more than the max payload size, the server will close the connection.
99+
for len(p) > 0 {
100+
pp := p
101+
if len(pp) > rptyJSONMaxDataSize {
102+
pp = p[:rptyJSONMaxDataSize]
103+
}
104+
p = p[len(pp):]
105+
req := codersdk.ReconnectingPTYRequest{Data: string(pp)}
106+
if err := c.wenc.Encode(req); err != nil {
107+
return n, xerrors.Errorf("encode pty request: %w", err)
108+
}
109+
n += len(pp)
110+
}
111+
return n, nil
112+
}
113+
114+
func (c *rptyConn) Close() (err error) {
115+
c.mu.Lock()
116+
if c.closed {
117+
c.mu.Unlock()
118+
return nil
119+
}
120+
c.closed = true
121+
c.mu.Unlock()
122+
123+
defer c.conn.Close()
124+
125+
// Send Ctrl+C to interrupt the command.
126+
_, err = c.writeNoLock([]byte("\u0003"))
127+
if err != nil {
128+
return xerrors.Errorf("write ctrl+c: %w", err)
129+
}
130+
select {
131+
case <-time.After(connCloseTimeout):
132+
return xerrors.Errorf("timeout waiting for read to finish")
133+
case err = <-c.readErr:
134+
if errors.Is(err, io.EOF) {
135+
return nil
136+
}
137+
return err
138+
}
139+
}
140+
141+
//nolint:revive // Ignore requestPTY control flag.
142+
func connectSSH(ctx context.Context, client *codersdk.Client, agentID uuid.UUID, cmd string, requestPTY bool) (rwc *countReadWriteCloser, err error) {
143+
var closers []func() error
144+
defer func() {
145+
if err != nil {
146+
for _, c := range closers {
147+
if err2 := c(); err2 != nil {
148+
err = errors.Join(err, err2)
149+
}
150+
}
151+
}
152+
}()
153+
34154
agentConn, err := client.DialWorkspaceAgent(ctx, agentID, &codersdk.DialWorkspaceAgentOptions{})
35155
if err != nil {
36156
return nil, xerrors.Errorf("dial workspace agent: %w", err)
37157
}
38-
agentConn.AwaitReachable(ctx)
158+
closers = append(closers, agentConn.Close)
159+
39160
sshClient, err := agentConn.SSHClient(ctx)
40161
if err != nil {
41162
return nil, xerrors.Errorf("get ssh client: %w", err)
42163
}
164+
closers = append(closers, sshClient.Close)
165+
43166
sshSession, err := sshClient.NewSession()
44167
if err != nil {
45-
_ = agentConn.Close()
46168
return nil, xerrors.Errorf("new ssh session: %w", err)
47169
}
48-
wrappedConn := &wrappedSSHConn{ctx: ctx}
170+
closers = append(closers, sshSession.Close)
171+
172+
wrappedConn := &wrappedSSHConn{}
173+
49174
// Do some plumbing to hook up the wrappedConn
50175
pr1, pw1 := io.Pipe()
176+
closers = append(closers, pr1.Close, pw1.Close)
51177
wrappedConn.stdout = pr1
52178
sshSession.Stdout = pw1
179+
53180
pr2, pw2 := io.Pipe()
181+
closers = append(closers, pr2.Close, pw2.Close)
54182
sshSession.Stdin = pr2
55183
wrappedConn.stdin = pw2
56-
err = sshSession.RequestPty("xterm", 25, 80, gossh.TerminalModes{})
57-
if err != nil {
58-
_ = pr1.Close()
59-
_ = pr2.Close()
60-
_ = pw1.Close()
61-
_ = pw2.Close()
62-
_ = sshSession.Close()
63-
_ = agentConn.Close()
64-
return nil, xerrors.Errorf("request pty: %w", err)
184+
185+
if requestPTY {
186+
err = sshSession.RequestPty("xterm", 25, 80, gossh.TerminalModes{})
187+
if err != nil {
188+
return nil, xerrors.Errorf("request pty: %w", err)
189+
}
65190
}
66-
err = sshSession.Shell()
191+
err = sshSession.Start(cmd)
67192
if err != nil {
68-
_ = sshSession.Close()
69-
_ = agentConn.Close()
70193
return nil, xerrors.Errorf("shell: %w", err)
71194
}
195+
waitErr := make(chan error, 1)
196+
go func() {
197+
waitErr <- sshSession.Wait()
198+
}()
72199

73200
closeFn := func() error {
74-
var merr error
75-
if err := sshSession.Close(); err != nil {
76-
merr = multierror.Append(merr, err)
201+
// Start by closing stdin so we stop writing to the ssh session.
202+
merr := pw2.Close()
203+
if err := sshSession.Signal(gossh.SIGHUP); err != nil {
204+
merr = errors.Join(merr, err)
77205
}
78-
if err := agentConn.Close(); err != nil {
79-
merr = multierror.Append(merr, err)
206+
select {
207+
case <-time.After(connCloseTimeout):
208+
merr = errors.Join(merr, xerrors.Errorf("timeout waiting for ssh session to close"))
209+
case err := <-waitErr:
210+
if err != nil {
211+
var exitErr *gossh.ExitError
212+
if xerrors.As(err, &exitErr) {
213+
// The exit status is 255 when the command is
214+
// interrupted by a signal. This is expected.
215+
if exitErr.ExitStatus() != 255 {
216+
merr = errors.Join(merr, xerrors.Errorf("ssh session exited with unexpected status: %d", int32(exitErr.ExitStatus())))
217+
}
218+
} else {
219+
merr = errors.Join(merr, err)
220+
}
221+
}
222+
}
223+
for _, c := range closers {
224+
if err := c(); err != nil {
225+
if !errors.Is(err, io.EOF) {
226+
merr = errors.Join(merr, err)
227+
}
228+
}
80229
}
81230
return merr
82231
}
83232
wrappedConn.close = closeFn
84233

85-
crw := &countReadWriteCloser{ctx: ctx, rwc: wrappedConn}
234+
crw := &countReadWriteCloser{rwc: wrappedConn}
235+
86236
return crw, nil
87237
}
88238

89239
// wrappedSSHConn wraps an ssh.Session to implement io.ReadWriteCloser.
90240
type wrappedSSHConn struct {
91-
ctx context.Context
92241
stdout io.Reader
93-
stdin io.Writer
242+
stdin io.WriteCloser
94243
closeOnce sync.Once
95244
closeErr error
96245
close func() error
97246
}
98247

99248
func (w *wrappedSSHConn) Close() error {
100249
w.closeOnce.Do(func() {
101-
_, _ = w.stdin.Write([]byte("exit\n"))
102250
w.closeErr = w.close()
103251
})
104252
return w.closeErr
105253
}
106254

107255
func (w *wrappedSSHConn) Read(p []byte) (n int, err error) {
108-
select {
109-
case <-w.ctx.Done():
110-
return 0, xerrors.Errorf("read: %w", w.ctx.Err())
111-
default:
112-
return w.stdout.Read(p)
113-
}
256+
return w.stdout.Read(p)
114257
}
115258

116259
func (w *wrappedSSHConn) Write(p []byte) (n int, err error) {
117-
select {
118-
case <-w.ctx.Done():
119-
return 0, xerrors.Errorf("write: %w", w.ctx.Err())
120-
default:
121-
return w.stdin.Write(p)
122-
}
260+
return w.stdin.Write(p)
123261
}

scaletest/workspacetraffic/countreadwriter.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ import (
1313

1414
// countReadWriteCloser wraps an io.ReadWriteCloser and counts the number of bytes read and written.
1515
type countReadWriteCloser struct {
16-
ctx context.Context
1716
rwc io.ReadWriteCloser
1817
readMetrics ConnMetrics
1918
writeMetrics ConnMetrics

0 commit comments

Comments
 (0)