Skip to content

fix: use screen for reconnecting terminal sessions if available #8640

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 30 commits into from
Aug 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
0b8499d
Add screen backend for reconnecting ptys
code-asher Jul 18, 2023
9cb043c
Fix leaking goroutine in wait
code-asher Aug 8, 2023
19633c9
Remove connection_id from reconnecting PTY
code-asher Aug 8, 2023
8f4956f
Remove error from close return
code-asher Aug 8, 2023
ee887b9
Refactor reconnecting PTY backends
code-asher Aug 8, 2023
1697070
Merge remote-tracking branch 'github/main' into asher/reconnection-wi…
code-asher Aug 8, 2023
a2149fc
Remove extra mutex unlock
code-asher Aug 8, 2023
e3c808b
Fix heartbeat typo in comment
code-asher Aug 8, 2023
369a36e
Tweak connection close
code-asher Aug 8, 2023
72d405c
Linter fixes
code-asher Aug 8, 2023
e37fc0f
Clear active conns on close
code-asher Aug 9, 2023
61a4253
Avoid useless buffer reset on close
code-asher Aug 9, 2023
c7978db
Move lifecycle after buffer and process are set
code-asher Aug 9, 2023
a083b31
Add info logs for starting, stopping, and attaching
code-asher Aug 9, 2023
bb40f78
Do not hold mutex while waiting for state in screen
code-asher Aug 9, 2023
089e1f9
Remove incorrect statement about closing on Attach
code-asher Aug 9, 2023
ee67045
Remove backend type from SDK/API
code-asher Aug 9, 2023
a6bcdd2
Use PATH to test buffered reconnecting pty
code-asher Aug 9, 2023
3ff1510
Do not hold mutex while waiting for state
code-asher Aug 9, 2023
a781173
Avoid clobbering attach error with close errors
code-asher Aug 10, 2023
7a8ec2e
Immediately read screen process
code-asher Aug 10, 2023
56ca7ac
Fix incorrect logger context on reconnecting PTY
code-asher Aug 10, 2023
9b88a68
Protect map and state with the same mutex
code-asher Aug 10, 2023
56e71c9
Fix incorrect test comment
code-asher Aug 11, 2023
d4170ca
Attempt fixing flake with 'echo test' command
code-asher Aug 11, 2023
34c5c1a
Avoid wait callback when context expires
code-asher Aug 11, 2023
88a6b96
Remove err from wait callback
code-asher Aug 11, 2023
1fd4e9a
Add state wait func where caller holds the lock
code-asher Aug 14, 2023
57f464a
Merge remote-tracking branch 'github/main' into asher/reconnection-wi…
code-asher Aug 14, 2023
968526d
Remove unused fn
code-asher Aug 14, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
212 changes: 19 additions & 193 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"sync"
"time"

"github.com/armon/circbuf"
"github.com/go-chi/chi/v5"
"github.com/google/uuid"
"github.com/prometheus/client_golang/prometheus"
Expand All @@ -36,12 +35,12 @@ import (

"cdr.dev/slog"
"github.com/coder/coder/agent/agentssh"
"github.com/coder/coder/agent/reconnectingpty"
"github.com/coder/coder/buildinfo"
"github.com/coder/coder/coderd/database"
"github.com/coder/coder/coderd/gitauth"
"github.com/coder/coder/codersdk"
"github.com/coder/coder/codersdk/agentsdk"
"github.com/coder/coder/pty"
"github.com/coder/coder/tailnet"
"github.com/coder/retry"
)
Expand Down Expand Up @@ -92,9 +91,6 @@ type Agent interface {
}

func New(options Options) Agent {
if options.ReconnectingPTYTimeout == 0 {
options.ReconnectingPTYTimeout = 5 * time.Minute
}
if options.Filesystem == nil {
options.Filesystem = afero.NewOsFs()
}
Expand Down Expand Up @@ -1075,8 +1071,8 @@ func (a *agent) handleReconnectingPTY(ctx context.Context, logger slog.Logger, m
defer a.connCountReconnectingPTY.Add(-1)

connectionID := uuid.NewString()
logger = logger.With(slog.F("message_id", msg.ID), slog.F("connection_id", connectionID))
logger.Debug(ctx, "starting handler")
connLogger := logger.With(slog.F("message_id", msg.ID), slog.F("connection_id", connectionID))
connLogger.Debug(ctx, "starting handler")

defer func() {
if err := retErr; err != nil {
Expand All @@ -1087,22 +1083,22 @@ func (a *agent) handleReconnectingPTY(ctx context.Context, logger slog.Logger, m
// If the agent is closed, we don't want to
// log this as an error since it's expected.
if closed {
logger.Debug(ctx, "reconnecting PTY failed with session error (agent closed)", slog.Error(err))
connLogger.Debug(ctx, "reconnecting pty failed with attach error (agent closed)", slog.Error(err))
} else {
logger.Error(ctx, "reconnecting PTY failed with session error", slog.Error(err))
connLogger.Error(ctx, "reconnecting pty failed with attach error", slog.Error(err))
}
}
logger.Debug(ctx, "session closed")
connLogger.Debug(ctx, "reconnecting pty connection closed")
}()

var rpty *reconnectingPTY
sendConnected := make(chan *reconnectingPTY, 1)
var rpty reconnectingpty.ReconnectingPTY
sendConnected := make(chan reconnectingpty.ReconnectingPTY, 1)
// On store, reserve this ID to prevent multiple concurrent new connections.
waitReady, ok := a.reconnectingPTYs.LoadOrStore(msg.ID, sendConnected)
if ok {
close(sendConnected) // Unused.
logger.Debug(ctx, "connecting to existing session")
c, ok := waitReady.(chan *reconnectingPTY)
connLogger.Debug(ctx, "connecting to existing reconnecting pty")
c, ok := waitReady.(chan reconnectingpty.ReconnectingPTY)
if !ok {
return xerrors.Errorf("found invalid type in reconnecting pty map: %T", waitReady)
}
Expand All @@ -1112,7 +1108,7 @@ func (a *agent) handleReconnectingPTY(ctx context.Context, logger slog.Logger, m
}
c <- rpty // Put it back for the next reconnect.
} else {
logger.Debug(ctx, "creating new session")
connLogger.Debug(ctx, "creating new reconnecting pty")

connected := false
defer func() {
Expand All @@ -1128,169 +1124,24 @@ func (a *agent) handleReconnectingPTY(ctx context.Context, logger slog.Logger, m
a.metrics.reconnectingPTYErrors.WithLabelValues("create_command").Add(1)
return xerrors.Errorf("create command: %w", err)
}
cmd.Env = append(cmd.Env, "TERM=xterm-256color")

// Default to buffer 64KiB.
circularBuffer, err := circbuf.NewBuffer(64 << 10)
if err != nil {
return xerrors.Errorf("create circular buffer: %w", err)
}

ptty, process, err := pty.Start(cmd)
if err != nil {
a.metrics.reconnectingPTYErrors.WithLabelValues("start_command").Add(1)
return xerrors.Errorf("start command: %w", err)
}
rpty = reconnectingpty.New(ctx, cmd, &reconnectingpty.Options{
Timeout: a.reconnectingPTYTimeout,
Metrics: a.metrics.reconnectingPTYErrors,
}, logger.With(slog.F("message_id", msg.ID)))

ctx, cancel := context.WithCancel(ctx)
rpty = &reconnectingPTY{
activeConns: map[string]net.Conn{
// We have to put the connection in the map instantly otherwise
// the connection won't be closed if the process instantly dies.
connectionID: conn,
},
ptty: ptty,
// Timeouts created with an after func can be reset!
timeout: time.AfterFunc(a.reconnectingPTYTimeout, cancel),
circularBuffer: circularBuffer,
}
// We don't need to separately monitor for the process exiting.
// When it exits, our ptty.OutputReader() will return EOF after
// reading all process output.
if err = a.trackConnGoroutine(func() {
buffer := make([]byte, 1024)
for {
read, err := rpty.ptty.OutputReader().Read(buffer)
if err != nil {
// When the PTY is closed, this is triggered.
// Error is typically a benign EOF, so only log for debugging.
if errors.Is(err, io.EOF) {
logger.Debug(ctx, "unable to read pty output, command might have exited", slog.Error(err))
} else {
logger.Warn(ctx, "unable to read pty output, command might have exited", slog.Error(err))
a.metrics.reconnectingPTYErrors.WithLabelValues("output_reader").Add(1)
}
break
}
part := buffer[:read]
rpty.circularBufferMutex.Lock()
_, err = rpty.circularBuffer.Write(part)
rpty.circularBufferMutex.Unlock()
if err != nil {
logger.Error(ctx, "write to circular buffer", slog.Error(err))
break
}
rpty.activeConnsMutex.Lock()
for cid, conn := range rpty.activeConns {
_, err = conn.Write(part)
if err != nil {
logger.Warn(ctx,
"error writing to active conn",
slog.F("other_conn_id", cid),
slog.Error(err),
)
a.metrics.reconnectingPTYErrors.WithLabelValues("write").Add(1)
}
}
rpty.activeConnsMutex.Unlock()
}

// Cleanup the process, PTY, and delete it's
// ID from memory.
_ = process.Kill()
rpty.Close()
rpty.Wait()
a.reconnectingPTYs.Delete(msg.ID)
}); err != nil {
_ = process.Kill()
_ = ptty.Close()
rpty.Close(err.Error())
return xerrors.Errorf("start routine: %w", err)
}

connected = true
sendConnected <- rpty
}
// Resize the PTY to initial height + width.
err := rpty.ptty.Resize(msg.Height, msg.Width)
if err != nil {
// We can continue after this, it's not fatal!
logger.Error(ctx, "reconnecting PTY initial resize failed, but will continue", slog.Error(err))
a.metrics.reconnectingPTYErrors.WithLabelValues("resize").Add(1)
}
// Write any previously stored data for the TTY.
rpty.circularBufferMutex.RLock()
prevBuf := slices.Clone(rpty.circularBuffer.Bytes())
rpty.circularBufferMutex.RUnlock()
// Note that there is a small race here between writing buffered
// data and storing conn in activeConns. This is likely a very minor
// edge case, but we should look into ways to avoid it. Holding
// activeConnsMutex would be one option, but holding this mutex
// while also holding circularBufferMutex seems dangerous.
_, err = conn.Write(prevBuf)
if err != nil {
a.metrics.reconnectingPTYErrors.WithLabelValues("write").Add(1)
return xerrors.Errorf("write buffer to conn: %w", err)
}
// Multiple connections to the same TTY are permitted.
// This could easily be used for terminal sharing, but
// we do it because it's a nice user experience to
// copy/paste a terminal URL and have it _just work_.
rpty.activeConnsMutex.Lock()
rpty.activeConns[connectionID] = conn
rpty.activeConnsMutex.Unlock()
// Resetting this timeout prevents the PTY from exiting.
rpty.timeout.Reset(a.reconnectingPTYTimeout)

ctx, cancelFunc := context.WithCancel(ctx)
defer cancelFunc()
heartbeat := time.NewTicker(a.reconnectingPTYTimeout / 2)
defer heartbeat.Stop()
go func() {
// Keep updating the activity while this
// connection is alive!
for {
select {
case <-ctx.Done():
return
case <-heartbeat.C:
}
rpty.timeout.Reset(a.reconnectingPTYTimeout)
}
}()
defer func() {
// After this connection ends, remove it from
// the PTYs active connections. If it isn't
// removed, all PTY data will be sent to it.
rpty.activeConnsMutex.Lock()
delete(rpty.activeConns, connectionID)
rpty.activeConnsMutex.Unlock()
}()
decoder := json.NewDecoder(conn)
var req codersdk.ReconnectingPTYRequest
for {
err = decoder.Decode(&req)
if xerrors.Is(err, io.EOF) {
return nil
}
if err != nil {
logger.Warn(ctx, "reconnecting PTY failed with read error", slog.Error(err))
return nil
}
_, err = rpty.ptty.InputWriter().Write([]byte(req.Data))
if err != nil {
logger.Warn(ctx, "reconnecting PTY failed with write error", slog.Error(err))
a.metrics.reconnectingPTYErrors.WithLabelValues("input_writer").Add(1)
return nil
}
// Check if a resize needs to happen!
if req.Height == 0 || req.Width == 0 {
continue
}
err = rpty.ptty.Resize(req.Height, req.Width)
if err != nil {
// We can continue after this, it's not fatal!
logger.Error(ctx, "reconnecting PTY resize failed, but will continue", slog.Error(err))
a.metrics.reconnectingPTYErrors.WithLabelValues("resize").Add(1)
}
}
return rpty.Attach(ctx, connectionID, conn, msg.Height, msg.Width, connLogger)
}

// startReportingConnectionStats runs the connection stats reporting goroutine.
Expand Down Expand Up @@ -1541,31 +1392,6 @@ lifecycleWaitLoop:
return nil
}

type reconnectingPTY struct {
activeConnsMutex sync.Mutex
activeConns map[string]net.Conn

circularBuffer *circbuf.Buffer
circularBufferMutex sync.RWMutex
timeout *time.Timer
ptty pty.PTYCmd
}

// Close ends all connections to the reconnecting
// PTY and clear the circular buffer.
func (r *reconnectingPTY) Close() {
r.activeConnsMutex.Lock()
defer r.activeConnsMutex.Unlock()
for _, conn := range r.activeConns {
_ = conn.Close()
}
_ = r.ptty.Close()
r.circularBufferMutex.Lock()
r.circularBuffer.Reset()
r.circularBufferMutex.Unlock()
r.timeout.Stop()
}

// userHomeDir returns the home directory of the current user, giving
// priority to the $HOME environment variable.
func userHomeDir() (string, error) {
Expand Down
Loading