@@ -21,7 +21,6 @@ import (
2121 "sync"
2222 "time"
2323
24- "github.com/armon/circbuf"
2524 "github.com/go-chi/chi/v5"
2625 "github.com/google/uuid"
2726 "github.com/prometheus/client_golang/prometheus"
@@ -36,12 +35,12 @@ import (
3635
3736 "cdr.dev/slog"
3837 "github.com/coder/coder/agent/agentssh"
38+ "github.com/coder/coder/agent/reconnectingpty"
3939 "github.com/coder/coder/buildinfo"
4040 "github.com/coder/coder/coderd/database"
4141 "github.com/coder/coder/coderd/gitauth"
4242 "github.com/coder/coder/codersdk"
4343 "github.com/coder/coder/codersdk/agentsdk"
44- "github.com/coder/coder/pty"
4544 "github.com/coder/coder/tailnet"
4645 "github.com/coder/retry"
4746)
@@ -92,9 +91,6 @@ type Agent interface {
9291}
9392
9493func New (options Options ) Agent {
95- if options .ReconnectingPTYTimeout == 0 {
96- options .ReconnectingPTYTimeout = 5 * time .Minute
97- }
9894 if options .Filesystem == nil {
9995 options .Filesystem = afero .NewOsFs ()
10096 }
@@ -1075,8 +1071,8 @@ func (a *agent) handleReconnectingPTY(ctx context.Context, logger slog.Logger, m
10751071 defer a .connCountReconnectingPTY .Add (- 1 )
10761072
10771073 connectionID := uuid .NewString ()
1078- logger = logger .With (slog .F ("message_id" , msg .ID ), slog .F ("connection_id" , connectionID ))
1079- logger .Debug (ctx , "starting handler" )
1074+ connLogger : = logger .With (slog .F ("message_id" , msg .ID ), slog .F ("connection_id" , connectionID ))
1075+ connLogger .Debug (ctx , "starting handler" )
10801076
10811077 defer func () {
10821078 if err := retErr ; err != nil {
@@ -1087,22 +1083,22 @@ func (a *agent) handleReconnectingPTY(ctx context.Context, logger slog.Logger, m
10871083 // If the agent is closed, we don't want to
10881084 // log this as an error since it's expected.
10891085 if closed {
1090- logger .Debug (ctx , "reconnecting PTY failed with session error (agent closed)" , slog .Error (err ))
1086+ connLogger .Debug (ctx , "reconnecting pty failed with attach error (agent closed)" , slog .Error (err ))
10911087 } else {
1092- logger .Error (ctx , "reconnecting PTY failed with session error" , slog .Error (err ))
1088+ connLogger .Error (ctx , "reconnecting pty failed with attach error" , slog .Error (err ))
10931089 }
10941090 }
1095- logger .Debug (ctx , "session closed" )
1091+ connLogger .Debug (ctx , "reconnecting pty connection closed" )
10961092 }()
10971093
1098- var rpty * reconnectingPTY
1099- sendConnected := make (chan * reconnectingPTY , 1 )
1094+ var rpty reconnectingpty. ReconnectingPTY
1095+ sendConnected := make (chan reconnectingpty. ReconnectingPTY , 1 )
11001096 // On store, reserve this ID to prevent multiple concurrent new connections.
11011097 waitReady , ok := a .reconnectingPTYs .LoadOrStore (msg .ID , sendConnected )
11021098 if ok {
11031099 close (sendConnected ) // Unused.
1104- logger .Debug (ctx , "connecting to existing session " )
1105- c , ok := waitReady .(chan * reconnectingPTY )
1100+ connLogger .Debug (ctx , "connecting to existing reconnecting pty " )
1101+ c , ok := waitReady .(chan reconnectingpty. ReconnectingPTY )
11061102 if ! ok {
11071103 return xerrors .Errorf ("found invalid type in reconnecting pty map: %T" , waitReady )
11081104 }
@@ -1112,7 +1108,7 @@ func (a *agent) handleReconnectingPTY(ctx context.Context, logger slog.Logger, m
11121108 }
11131109 c <- rpty // Put it back for the next reconnect.
11141110 } else {
1115- logger .Debug (ctx , "creating new session " )
1111+ connLogger .Debug (ctx , "creating new reconnecting pty " )
11161112
11171113 connected := false
11181114 defer func () {
@@ -1128,169 +1124,24 @@ func (a *agent) handleReconnectingPTY(ctx context.Context, logger slog.Logger, m
11281124 a .metrics .reconnectingPTYErrors .WithLabelValues ("create_command" ).Add (1 )
11291125 return xerrors .Errorf ("create command: %w" , err )
11301126 }
1131- cmd .Env = append (cmd .Env , "TERM=xterm-256color" )
1132-
1133- // Default to buffer 64KiB.
1134- circularBuffer , err := circbuf .NewBuffer (64 << 10 )
1135- if err != nil {
1136- return xerrors .Errorf ("create circular buffer: %w" , err )
1137- }
11381127
1139- ptty , process , err := pty .Start (cmd )
1140- if err != nil {
1141- a .metrics .reconnectingPTYErrors .WithLabelValues ("start_command" ).Add (1 )
1142- return xerrors .Errorf ("start command: %w" , err )
1143- }
1128+ rpty = reconnectingpty .New (ctx , cmd , & reconnectingpty.Options {
1129+ Timeout : a .reconnectingPTYTimeout ,
1130+ Metrics : a .metrics .reconnectingPTYErrors ,
1131+ }, logger .With (slog .F ("message_id" , msg .ID )))
11441132
1145- ctx , cancel := context .WithCancel (ctx )
1146- rpty = & reconnectingPTY {
1147- activeConns : map [string ]net.Conn {
1148- // We have to put the connection in the map instantly otherwise
1149- // the connection won't be closed if the process instantly dies.
1150- connectionID : conn ,
1151- },
1152- ptty : ptty ,
1153- // Timeouts created with an after func can be reset!
1154- timeout : time .AfterFunc (a .reconnectingPTYTimeout , cancel ),
1155- circularBuffer : circularBuffer ,
1156- }
1157- // We don't need to separately monitor for the process exiting.
1158- // When it exits, our ptty.OutputReader() will return EOF after
1159- // reading all process output.
11601133 if err = a .trackConnGoroutine (func () {
1161- buffer := make ([]byte , 1024 )
1162- for {
1163- read , err := rpty .ptty .OutputReader ().Read (buffer )
1164- if err != nil {
1165- // When the PTY is closed, this is triggered.
1166- // Error is typically a benign EOF, so only log for debugging.
1167- if errors .Is (err , io .EOF ) {
1168- logger .Debug (ctx , "unable to read pty output, command might have exited" , slog .Error (err ))
1169- } else {
1170- logger .Warn (ctx , "unable to read pty output, command might have exited" , slog .Error (err ))
1171- a .metrics .reconnectingPTYErrors .WithLabelValues ("output_reader" ).Add (1 )
1172- }
1173- break
1174- }
1175- part := buffer [:read ]
1176- rpty .circularBufferMutex .Lock ()
1177- _ , err = rpty .circularBuffer .Write (part )
1178- rpty .circularBufferMutex .Unlock ()
1179- if err != nil {
1180- logger .Error (ctx , "write to circular buffer" , slog .Error (err ))
1181- break
1182- }
1183- rpty .activeConnsMutex .Lock ()
1184- for cid , conn := range rpty .activeConns {
1185- _ , err = conn .Write (part )
1186- if err != nil {
1187- logger .Warn (ctx ,
1188- "error writing to active conn" ,
1189- slog .F ("other_conn_id" , cid ),
1190- slog .Error (err ),
1191- )
1192- a .metrics .reconnectingPTYErrors .WithLabelValues ("write" ).Add (1 )
1193- }
1194- }
1195- rpty .activeConnsMutex .Unlock ()
1196- }
1197-
1198- // Cleanup the process, PTY, and delete it's
1199- // ID from memory.
1200- _ = process .Kill ()
1201- rpty .Close ()
1134+ rpty .Wait ()
12021135 a .reconnectingPTYs .Delete (msg .ID )
12031136 }); err != nil {
1204- _ = process .Kill ()
1205- _ = ptty .Close ()
1137+ rpty .Close (err .Error ())
12061138 return xerrors .Errorf ("start routine: %w" , err )
12071139 }
1140+
12081141 connected = true
12091142 sendConnected <- rpty
12101143 }
1211- // Resize the PTY to initial height + width.
1212- err := rpty .ptty .Resize (msg .Height , msg .Width )
1213- if err != nil {
1214- // We can continue after this, it's not fatal!
1215- logger .Error (ctx , "reconnecting PTY initial resize failed, but will continue" , slog .Error (err ))
1216- a .metrics .reconnectingPTYErrors .WithLabelValues ("resize" ).Add (1 )
1217- }
1218- // Write any previously stored data for the TTY.
1219- rpty .circularBufferMutex .RLock ()
1220- prevBuf := slices .Clone (rpty .circularBuffer .Bytes ())
1221- rpty .circularBufferMutex .RUnlock ()
1222- // Note that there is a small race here between writing buffered
1223- // data and storing conn in activeConns. This is likely a very minor
1224- // edge case, but we should look into ways to avoid it. Holding
1225- // activeConnsMutex would be one option, but holding this mutex
1226- // while also holding circularBufferMutex seems dangerous.
1227- _ , err = conn .Write (prevBuf )
1228- if err != nil {
1229- a .metrics .reconnectingPTYErrors .WithLabelValues ("write" ).Add (1 )
1230- return xerrors .Errorf ("write buffer to conn: %w" , err )
1231- }
1232- // Multiple connections to the same TTY are permitted.
1233- // This could easily be used for terminal sharing, but
1234- // we do it because it's a nice user experience to
1235- // copy/paste a terminal URL and have it _just work_.
1236- rpty .activeConnsMutex .Lock ()
1237- rpty .activeConns [connectionID ] = conn
1238- rpty .activeConnsMutex .Unlock ()
1239- // Resetting this timeout prevents the PTY from exiting.
1240- rpty .timeout .Reset (a .reconnectingPTYTimeout )
1241-
1242- ctx , cancelFunc := context .WithCancel (ctx )
1243- defer cancelFunc ()
1244- heartbeat := time .NewTicker (a .reconnectingPTYTimeout / 2 )
1245- defer heartbeat .Stop ()
1246- go func () {
1247- // Keep updating the activity while this
1248- // connection is alive!
1249- for {
1250- select {
1251- case <- ctx .Done ():
1252- return
1253- case <- heartbeat .C :
1254- }
1255- rpty .timeout .Reset (a .reconnectingPTYTimeout )
1256- }
1257- }()
1258- defer func () {
1259- // After this connection ends, remove it from
1260- // the PTYs active connections. If it isn't
1261- // removed, all PTY data will be sent to it.
1262- rpty .activeConnsMutex .Lock ()
1263- delete (rpty .activeConns , connectionID )
1264- rpty .activeConnsMutex .Unlock ()
1265- }()
1266- decoder := json .NewDecoder (conn )
1267- var req codersdk.ReconnectingPTYRequest
1268- for {
1269- err = decoder .Decode (& req )
1270- if xerrors .Is (err , io .EOF ) {
1271- return nil
1272- }
1273- if err != nil {
1274- logger .Warn (ctx , "reconnecting PTY failed with read error" , slog .Error (err ))
1275- return nil
1276- }
1277- _ , err = rpty .ptty .InputWriter ().Write ([]byte (req .Data ))
1278- if err != nil {
1279- logger .Warn (ctx , "reconnecting PTY failed with write error" , slog .Error (err ))
1280- a .metrics .reconnectingPTYErrors .WithLabelValues ("input_writer" ).Add (1 )
1281- return nil
1282- }
1283- // Check if a resize needs to happen!
1284- if req .Height == 0 || req .Width == 0 {
1285- continue
1286- }
1287- err = rpty .ptty .Resize (req .Height , req .Width )
1288- if err != nil {
1289- // We can continue after this, it's not fatal!
1290- logger .Error (ctx , "reconnecting PTY resize failed, but will continue" , slog .Error (err ))
1291- a .metrics .reconnectingPTYErrors .WithLabelValues ("resize" ).Add (1 )
1292- }
1293- }
1144+ return rpty .Attach (ctx , connectionID , conn , msg .Height , msg .Width , connLogger )
12941145}
12951146
12961147// startReportingConnectionStats runs the connection stats reporting goroutine.
@@ -1541,31 +1392,6 @@ lifecycleWaitLoop:
15411392 return nil
15421393}
15431394
1544- type reconnectingPTY struct {
1545- activeConnsMutex sync.Mutex
1546- activeConns map [string ]net.Conn
1547-
1548- circularBuffer * circbuf.Buffer
1549- circularBufferMutex sync.RWMutex
1550- timeout * time.Timer
1551- ptty pty.PTYCmd
1552- }
1553-
1554- // Close ends all connections to the reconnecting
1555- // PTY and clear the circular buffer.
1556- func (r * reconnectingPTY ) Close () {
1557- r .activeConnsMutex .Lock ()
1558- defer r .activeConnsMutex .Unlock ()
1559- for _ , conn := range r .activeConns {
1560- _ = conn .Close ()
1561- }
1562- _ = r .ptty .Close ()
1563- r .circularBufferMutex .Lock ()
1564- r .circularBuffer .Reset ()
1565- r .circularBufferMutex .Unlock ()
1566- r .timeout .Stop ()
1567- }
1568-
15691395// userHomeDir returns the home directory of the current user, giving
15701396// priority to the $HOME environment variable.
15711397func userHomeDir () (string , error ) {
0 commit comments