@@ -16,6 +16,7 @@ import (
1616 "strconv"
1717 "strings"
1818 "sync"
19+ "sync/atomic"
1920 "testing"
2021 "time"
2122
@@ -503,6 +504,45 @@ func TestAgent(t *testing.T) {
503504 require .NoError (t , err )
504505 t .Logf ("%.2f MBits/s" , res [len (res )- 1 ].MBitsPerSecond ())
505506 })
507+
508+ t .Run ("Reconnect" , func (t * testing.T ) {
509+ t .Parallel ()
510+ // After the agent is disconnected from a coordinator, it's supposed
511+ // to reconnect!
512+ coordinator := tailnet .NewCoordinator ()
513+ agentID := uuid .New ()
514+ statsCh := make (chan * codersdk.AgentStats )
515+ derpMap := tailnettest .RunDERPAndSTUN (t )
516+ client := & client {
517+ t : t ,
518+ agentID : agentID ,
519+ metadata : codersdk.WorkspaceAgentMetadata {
520+ DERPMap : derpMap ,
521+ },
522+ statsChan : statsCh ,
523+ coordinator : coordinator ,
524+ }
525+ initialized := atomic.Int32 {}
526+ closer := agent .New (agent.Options {
527+ ExchangeToken : func (ctx context.Context ) error {
528+ initialized .Add (1 )
529+ return nil
530+ },
531+ Client : client ,
532+ Logger : slogtest .Make (t , nil ).Leveled (slog .LevelInfo ),
533+ })
534+ t .Cleanup (func () {
535+ _ = closer .Close ()
536+ })
537+
538+ require .Eventually (t , func () bool {
539+ return coordinator .Node (agentID ) != nil
540+ }, testutil .WaitShort , testutil .IntervalFast )
541+ client .lastWorkspaceAgent ()
542+ require .Eventually (t , func () bool {
543+ return initialized .Load () == 2
544+ }, testutil .WaitShort , testutil .IntervalFast )
545+ })
506546}
507547
508548func setupSSHCommand (t * testing.T , beforeArgs []string , afterArgs []string ) * exec.Cmd {
@@ -572,57 +612,15 @@ func setupAgent(t *testing.T, metadata codersdk.WorkspaceAgentMetadata, ptyTimeo
572612 agentID := uuid .New ()
573613 statsCh := make (chan * codersdk.AgentStats )
574614 closer := agent .New (agent.Options {
575- FetchMetadata : func (ctx context.Context ) (codersdk.WorkspaceAgentMetadata , error ) {
576- return metadata , nil
577- },
578- CoordinatorDialer : func (ctx context.Context ) (net.Conn , error ) {
579- clientConn , serverConn := net .Pipe ()
580- closed := make (chan struct {})
581- t .Cleanup (func () {
582- _ = serverConn .Close ()
583- _ = clientConn .Close ()
584- <- closed
585- })
586- go func () {
587- _ = coordinator .ServeAgent (serverConn , agentID )
588- close (closed )
589- }()
590- return clientConn , nil
615+ Client : & client {
616+ t : t ,
617+ agentID : agentID ,
618+ metadata : metadata ,
619+ statsChan : statsCh ,
620+ coordinator : coordinator ,
591621 },
592622 Logger : slogtest .Make (t , nil ).Leveled (slog .LevelDebug ),
593623 ReconnectingPTYTimeout : ptyTimeout ,
594- StatsReporter : func (ctx context.Context , log slog.Logger , statsFn func () * codersdk.AgentStats ) (io.Closer , error ) {
595- doneCh := make (chan struct {})
596- ctx , cancel := context .WithCancel (ctx )
597-
598- go func () {
599- defer close (doneCh )
600-
601- t := time .NewTicker (time .Millisecond * 100 )
602- defer t .Stop ()
603- for {
604- select {
605- case <- ctx .Done ():
606- return
607- case <- t .C :
608- }
609- select {
610- case statsCh <- statsFn ():
611- case <- ctx .Done ():
612- return
613- default :
614- // We don't want to send old stats.
615- continue
616- }
617- }
618- }()
619- return closeFunc (func () error {
620- cancel ()
621- <- doneCh
622- close (statsCh )
623- return nil
624- }), nil
625- },
626624 })
627625 t .Cleanup (func () {
628626 _ = closer .Close ()
@@ -679,3 +677,73 @@ func assertWritePayload(t *testing.T, w io.Writer, payload []byte) {
679677 assert .NoError (t , err , "write payload" )
680678 assert .Equal (t , len (payload ), n , "payload length does not match" )
681679}
680+
681+ type client struct {
682+ t * testing.T
683+ agentID uuid.UUID
684+ metadata codersdk.WorkspaceAgentMetadata
685+ statsChan chan * codersdk.AgentStats
686+ coordinator tailnet.Coordinator
687+ lastWorkspaceAgent func ()
688+ }
689+
690+ func (c * client ) WorkspaceAgentMetadata (_ context.Context ) (codersdk.WorkspaceAgentMetadata , error ) {
691+ return c .metadata , nil
692+ }
693+
694+ func (c * client ) ListenWorkspaceAgent (_ context.Context ) (net.Conn , error ) {
695+ clientConn , serverConn := net .Pipe ()
696+ closed := make (chan struct {})
697+ c .lastWorkspaceAgent = func () {
698+ _ = serverConn .Close ()
699+ _ = clientConn .Close ()
700+ <- closed
701+ }
702+ c .t .Cleanup (c .lastWorkspaceAgent )
703+ go func () {
704+ _ = c .coordinator .ServeAgent (serverConn , c .agentID )
705+ close (closed )
706+ }()
707+ return clientConn , nil
708+ }
709+
710+ func (c * client ) AgentReportStats (ctx context.Context , _ slog.Logger , stats func () * codersdk.AgentStats ) (io.Closer , error ) {
711+ doneCh := make (chan struct {})
712+ ctx , cancel := context .WithCancel (ctx )
713+
714+ go func () {
715+ defer close (doneCh )
716+
717+ t := time .NewTicker (time .Millisecond * 100 )
718+ defer t .Stop ()
719+ for {
720+ select {
721+ case <- ctx .Done ():
722+ return
723+ case <- t .C :
724+ }
725+ select {
726+ case c .statsChan <- stats ():
727+ case <- ctx .Done ():
728+ return
729+ default :
730+ // We don't want to send old stats.
731+ continue
732+ }
733+ }
734+ }()
735+ return closeFunc (func () error {
736+ cancel ()
737+ <- doneCh
738+ close (c .statsChan )
739+ return nil
740+ }), nil
741+ }
742+
743+ func (* client ) PostWorkspaceAgentAppHealth (_ context.Context , _ codersdk.PostWorkspaceAppHealthsRequest ) error {
744+ return nil
745+ }
746+
747+ func (* client ) PostWorkspaceAgentVersion (_ context.Context , _ string ) error {
748+ return nil
749+ }
0 commit comments