forked from coder/coder
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathagentscripts.go
More file actions
314 lines (289 loc) · 8.96 KB
/
agentscripts.go
File metadata and controls
314 lines (289 loc) · 8.96 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
package agentscripts
import (
"context"
"errors"
"fmt"
"io"
"os"
"os/exec"
"os/user"
"path/filepath"
"sync"
"sync/atomic"
"time"
"github.com/robfig/cron/v3"
"github.com/spf13/afero"
"golang.org/x/sync/errgroup"
"golang.org/x/xerrors"
"cdr.dev/slog"
"github.com/coder/coder/v2/agent/agentssh"
"github.com/coder/coder/v2/codersdk"
"github.com/coder/coder/v2/codersdk/agentsdk"
)
var (
// ErrTimeout is returned when a script times out.
ErrTimeout = xerrors.New("script timed out")
// ErrOutputPipesOpen is returned when a script exits leaving the output
// pipe(s) (stdout, stderr) open. This happens because we set WaitDelay on
// the command, which gives us two things:
//
// 1. The ability to ensure that a script exits (this is important for e.g.
// blocking login, and avoiding doing so indefinitely)
// 2. Improved command cancellation on timeout
ErrOutputPipesOpen = xerrors.New("script exited without closing output pipes")
parser = cron.NewParser(cron.Second | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.DowOptional)
)
// Options are a set of options for the runner.
type Options struct {
LogDir string
Logger slog.Logger
SSHServer *agentssh.Server
Filesystem afero.Fs
PatchLogs func(ctx context.Context, req agentsdk.PatchLogs) error
}
// New creates a runner for the provided scripts.
func New(opts Options) *Runner {
cronCtx, cronCtxCancel := context.WithCancel(context.Background())
return &Runner{
Options: opts,
cronCtx: cronCtx,
cronCtxCancel: cronCtxCancel,
cron: cron.New(cron.WithParser(parser)),
closed: make(chan struct{}),
}
}
type Runner struct {
Options
cronCtx context.Context
cronCtxCancel context.CancelFunc
cmdCloseWait sync.WaitGroup
closed chan struct{}
closeMutex sync.Mutex
cron *cron.Cron
initialized atomic.Bool
scripts []codersdk.WorkspaceAgentScript
}
// Init initializes the runner with the provided scripts.
// It also schedules any scripts that have a schedule.
// This function must be called before Execute.
func (r *Runner) Init(scripts []codersdk.WorkspaceAgentScript) error {
if r.initialized.Load() {
return xerrors.New("init: already initialized")
}
r.initialized.Store(true)
r.scripts = scripts
r.Logger.Info(r.cronCtx, "initializing agent scripts", slog.F("script_count", len(scripts)), slog.F("log_dir", r.LogDir))
for _, script := range scripts {
if script.Cron == "" {
continue
}
script := script
_, err := r.cron.AddFunc(script.Cron, func() {
err := r.run(r.cronCtx, script)
if err != nil {
r.Logger.Warn(context.Background(), "run agent script on schedule", slog.Error(err))
}
})
if err != nil {
return xerrors.Errorf("add schedule: %w", err)
}
}
return nil
}
// StartCron starts the cron scheduler.
// This is done async to allow for the caller to execute scripts prior.
func (r *Runner) StartCron() {
// cron.Start() and cron.Stop() does not guarantee that the cron goroutine
// has exited by the time the `cron.Stop()` context returns, so we need to
// track it manually.
err := r.trackCommandGoroutine(func() {
r.cron.Run()
})
if err != nil {
r.Logger.Warn(context.Background(), "start cron failed", slog.Error(err))
}
}
// Execute runs a set of scripts according to a filter.
func (r *Runner) Execute(ctx context.Context, filter func(script codersdk.WorkspaceAgentScript) bool) error {
if filter == nil {
// Execute em' all!
filter = func(script codersdk.WorkspaceAgentScript) bool {
return true
}
}
var eg errgroup.Group
for _, script := range r.scripts {
if !filter(script) {
continue
}
script := script
eg.Go(func() error {
err := r.run(ctx, script)
if err != nil {
return xerrors.Errorf("run agent script %q: %w", script.LogSourceID, err)
}
return nil
})
}
return eg.Wait()
}
// run executes the provided script with the timeout.
// If the timeout is exceeded, the process is sent an interrupt signal.
// If the process does not exit after a few seconds, it is forcefully killed.
// This function immediately returns after a timeout, and does not wait for the process to exit.
func (r *Runner) run(ctx context.Context, script codersdk.WorkspaceAgentScript) error {
logPath := script.LogPath
if logPath == "" {
logPath = fmt.Sprintf("coder-script-%s.log", script.LogSourceID)
}
if logPath[0] == '~' {
// First we check the environment.
homeDir, err := os.UserHomeDir()
if err != nil {
u, err := user.Current()
if err != nil {
return xerrors.Errorf("current user: %w", err)
}
homeDir = u.HomeDir
}
logPath = filepath.Join(homeDir, logPath[1:])
}
logPath = os.ExpandEnv(logPath)
if !filepath.IsAbs(logPath) {
logPath = filepath.Join(r.LogDir, logPath)
}
logger := r.Logger.With(slog.F("log_path", logPath))
logger.Info(ctx, "running agent script", slog.F("script", script.Script))
fileWriter, err := r.Filesystem.OpenFile(logPath, os.O_CREATE|os.O_RDWR, 0o600)
if err != nil {
return xerrors.Errorf("open %s script log file: %w", logPath, err)
}
defer func() {
err := fileWriter.Close()
if err != nil {
logger.Warn(ctx, fmt.Sprintf("close %s script log file", logPath), slog.Error(err))
}
}()
var cmd *exec.Cmd
cmdCtx := ctx
if script.Timeout > 0 {
var ctxCancel context.CancelFunc
cmdCtx, ctxCancel = context.WithTimeout(ctx, script.Timeout)
defer ctxCancel()
}
cmdPty, err := r.SSHServer.CreateCommand(cmdCtx, script.Script, nil)
if err != nil {
return xerrors.Errorf("%s script: create command: %w", logPath, err)
}
cmd = cmdPty.AsExec()
cmd.SysProcAttr = cmdSysProcAttr()
cmd.WaitDelay = 10 * time.Second
cmd.Cancel = cmdCancel(cmd)
send, flushAndClose := agentsdk.LogsSender(script.LogSourceID, r.PatchLogs, logger)
// If ctx is canceled here (or in a writer below), we may be
// discarding logs, but that's okay because we're shutting down
// anyway. We could consider creating a new context here if we
// want better control over flush during shutdown.
defer func() {
if err := flushAndClose(ctx); err != nil {
logger.Warn(ctx, "flush startup logs failed", slog.Error(err))
}
}()
infoW := agentsdk.LogsWriter(ctx, send, script.LogSourceID, codersdk.LogLevelInfo)
defer infoW.Close()
errW := agentsdk.LogsWriter(ctx, send, script.LogSourceID, codersdk.LogLevelError)
defer errW.Close()
cmd.Stdout = io.MultiWriter(fileWriter, infoW)
cmd.Stderr = io.MultiWriter(fileWriter, errW)
start := time.Now()
defer func() {
end := time.Now()
execTime := end.Sub(start)
exitCode := 0
if err != nil {
exitCode = 255 // Unknown status.
var exitError *exec.ExitError
if xerrors.As(err, &exitError) {
exitCode = exitError.ExitCode()
}
logger.Warn(ctx, fmt.Sprintf("%s script failed", logPath), slog.F("execution_time", execTime), slog.F("exit_code", exitCode), slog.Error(err))
} else {
logger.Info(ctx, fmt.Sprintf("%s script completed", logPath), slog.F("execution_time", execTime), slog.F("exit_code", exitCode))
}
}()
err = cmd.Start()
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
return ErrTimeout
}
return xerrors.Errorf("%s script: start command: %w", logPath, err)
}
cmdDone := make(chan error, 1)
err = r.trackCommandGoroutine(func() {
cmdDone <- cmd.Wait()
})
if err != nil {
return xerrors.Errorf("%s script: track command goroutine: %w", logPath, err)
}
select {
case <-cmdCtx.Done():
// Wait for the command to drain!
select {
case <-cmdDone:
case <-time.After(10 * time.Second):
}
err = cmdCtx.Err()
case err = <-cmdDone:
}
switch {
case errors.Is(err, exec.ErrWaitDelay):
err = ErrOutputPipesOpen
message := fmt.Sprintf("script exited successfully, but output pipes were not closed after %s", cmd.WaitDelay)
details := fmt.Sprint(
"This usually means a child process was started with references to stdout or stderr. As a result, this " +
"process may now have been terminated. Consider redirecting the output or using a separate " +
"\"coder_script\" for the process, see " +
"https://coder.com/docs/v2/latest/templates/troubleshooting#startup-script-issues for more information.",
)
// Inform the user by propagating the message via log writers.
_, _ = fmt.Fprintf(cmd.Stderr, "WARNING: %s. %s\n", message, details)
// Also log to agent logs for ease of debugging.
r.Logger.Warn(ctx, message, slog.F("details", details), slog.Error(err))
case errors.Is(err, context.DeadlineExceeded):
err = ErrTimeout
}
return err
}
func (r *Runner) Close() error {
r.closeMutex.Lock()
defer r.closeMutex.Unlock()
if r.isClosed() {
return nil
}
close(r.closed)
r.cronCtxCancel()
<-r.cron.Stop().Done()
r.cmdCloseWait.Wait()
return nil
}
func (r *Runner) trackCommandGoroutine(fn func()) error {
r.closeMutex.Lock()
defer r.closeMutex.Unlock()
if r.isClosed() {
return xerrors.New("track command goroutine: closed")
}
r.cmdCloseWait.Add(1)
go func() {
defer r.cmdCloseWait.Done()
fn()
}()
return nil
}
func (r *Runner) isClosed() bool {
select {
case <-r.closed:
return true
default:
return false
}
}