From 94c600440d8de50ea0c957985f50fd0e235b80db Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michal=20Tich=C3=A1k?= Date: Wed, 10 Dec 2025 15:55:01 +0100 Subject: [PATCH 01/16] [core] part of OCTRL-1076, fixing leaked logging goroutines --- executor/executable/basictaskcommon.go | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/executor/executable/basictaskcommon.go b/executor/executable/basictaskcommon.go index 166270962..014d640ba 100644 --- a/executor/executable/basictaskcommon.go +++ b/executor/executable/basictaskcommon.go @@ -28,12 +28,13 @@ import ( "bytes" "encoding/json" "errors" - "github.com/AliceO2Group/Control/common/utils" "io" "os/exec" "syscall" "time" + "github.com/AliceO2Group/Control/common/utils" + "github.com/AliceO2Group/Control/common/controlmode" "github.com/AliceO2Group/Control/common/event" "github.com/AliceO2Group/Control/common/logger/infologger" @@ -73,6 +74,8 @@ func (t *basicTaskBase) startBasicTask() (err error) { var errStdout, errStderr error var stdoutBuf, stderrBuf bytes.Buffer var stdout, stderr io.Writer + // To be closed after task is done + var stdoutLog, stderrLog *io.PipeWriter if t.Tci.Stdout == nil { none := "none" @@ -85,7 +88,7 @@ func (t *basicTaskBase) startBasicTask() (err error) { switch *t.Tci.Stdout { case "stdout": - stdoutLog := log.WithPrefix("task-stdout"). + stdoutLog = log.WithPrefix("task-stdout"). WithField("level", infologger.IL_Support). WithField("partition", t.knownEnvironmentId.String()). WithField("detector", t.knownDetector). @@ -97,7 +100,7 @@ func (t *basicTaskBase) startBasicTask() (err error) { stdout = io.MultiWriter(stdoutLog, &stdoutBuf) case "all": - stdoutLog := log.WithPrefix("task-stdout"). + stdoutLog = log.WithPrefix("task-stdout"). WithField("level", infologger.IL_Support). WithField("partition", t.knownEnvironmentId.String()). WithField("detector", t.knownDetector). @@ -112,7 +115,7 @@ func (t *basicTaskBase) startBasicTask() (err error) { switch *t.Tci.Stderr { case "stdout": - stderrLog := log.WithPrefix("task-stderr"). + stderrLog = log.WithPrefix("task-stderr"). WithField("level", infologger.IL_Support). WithField("partition", t.knownEnvironmentId.String()). WithField("detector", t.knownDetector). @@ -124,7 +127,7 @@ func (t *basicTaskBase) startBasicTask() (err error) { stderr = io.MultiWriter(stderrLog, &stderrBuf) case "all": - stderrLog := log.WithPrefix("task-stderr"). + stderrLog = log.WithPrefix("task-stderr"). WithField("level", infologger.IL_Support). WithField("partition", t.knownEnvironmentId.String()). WithField("detector", t.knownDetector). @@ -142,7 +145,6 @@ func (t *basicTaskBase) startBasicTask() (err error) { stderrIn, _ := t.taskCmd.StderrPipe() err = t.taskCmd.Start() - if err != nil { log.WithField("partition", t.knownEnvironmentId.String()). WithFields(logrus.Fields{ @@ -172,6 +174,13 @@ func (t *basicTaskBase) startBasicTask() (err error) { err = taskCmd.Wait() // ^ when this unblocks, the task is done + if stdoutLog != nil { + stdoutLog.Close() + } + if stderrLog != nil { + stderrLog.Close() + } + pendingState := mesos.TASK_FINISHED var tciCommandStr string if t.Tci.Value != nil { From 2317e45f37e17cbd3543c2ec0e7f6e404ea04c4a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michal=20Tich=C3=A1k?= Date: Wed, 10 Dec 2025 16:07:58 +0100 Subject: [PATCH 02/16] [core] part of OCTRL-1076, fixed leaked controllable task Kill goroutine --- executor/executable/controllabletask.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/executor/executable/controllabletask.go b/executor/executable/controllabletask.go index e45256bdc..ff8d162f0 100644 --- a/executor/executable/controllabletask.go +++ b/executor/executable/controllabletask.go @@ -728,8 +728,8 @@ func (t *ControllableTask) Kill() error { }). Debug("state DONE not reached, about to commit transition") - // Call cmd.Commit() asynchronous - commitDone := make(chan *CommitResponse) + // Call cmd.Commit() asynchronous with buffered channel so it doesn't get stuck in a case of timeout + commitDone := make(chan *CommitResponse, 1) go func() { var cr CommitResponse cr.newState, cr.transitionError = cmd.Commit() @@ -856,7 +856,6 @@ func (t *ControllableTask) doKill9(pid int) error { } func (t *ControllableTask) doTermIntKill(pid int) error { - killErrCh := make(chan error) go func() { log.WithField("partition", t.knownEnvironmentId.String()). From 6f582db526a95164b7fc6c56fc1ca4bc24661442 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michal=20Tich=C3=A1k?= Date: Thu, 11 Dec 2025 17:34:21 +0100 Subject: [PATCH 03/16] [core] part of OCTRL-1076, proper error handling in eventloop --- executor/eventloop.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/executor/eventloop.go b/executor/eventloop.go index 3c19ed251..5fe38d14c 100644 --- a/executor/eventloop.go +++ b/executor/eventloop.go @@ -69,6 +69,11 @@ func eventLoop(state *internalState, decoder encoding.Decoder, h events.Handler) case e := <-eventCh: log.Trace("EVENT LOOP about to handle event") err = h.HandleEvent(ctx, &e) + // if we get an error here we are stoping the eventloop before triggering another nextEventNotify + // so it won't get stuck on eventCh, errorCh + if err != nil { + break + } // Spawn a goroutine to wait for the next event go nextEventNotify(decoder, eventCh, errorCh) @@ -100,7 +105,7 @@ func sendFailedTasks(state *internalState) { delete(state.failedTasks, taskID) // If there aren't any failed and active tasks, we request to shutdown the executor. if len(state.failedTasks) == 0 && len(state.activeTasks) == 0 { - //Originally state.shouldQuit = true but we want to keep the executor running + // Originally state.shouldQuit = true but we want to keep the executor running log.WithField("executorId", state.executor.ExecutorID).Info("task failure notified, no tasks present on executor") } } From a785ffc8d9fcbca56f8ab94827842a06b12eceb2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michal=20Tich=C3=A1k?= Date: Fri, 12 Dec 2025 11:01:15 +0100 Subject: [PATCH 04/16] [executor] properly closing pipes in basic common task when in error --- executor/executable/basictaskcommon.go | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/executor/executable/basictaskcommon.go b/executor/executable/basictaskcommon.go index 014d640ba..068752aef 100644 --- a/executor/executable/basictaskcommon.go +++ b/executor/executable/basictaskcommon.go @@ -155,6 +155,8 @@ func (t *basicTaskBase) startBasicTask() (err error) { }). Error("failed to run basic task") + closePipeWriters(stdoutLog, stderrLog) + return err } log.WithField("partition", t.knownEnvironmentId.String()). @@ -174,12 +176,7 @@ func (t *basicTaskBase) startBasicTask() (err error) { err = taskCmd.Wait() // ^ when this unblocks, the task is done - if stdoutLog != nil { - stdoutLog.Close() - } - if stderrLog != nil { - stderrLog.Close() - } + closePipeWriters(stdoutLog, stderrLog) pendingState := mesos.TASK_FINISHED var tciCommandStr string @@ -252,6 +249,15 @@ func (t *basicTaskBase) startBasicTask() (err error) { return err } +func closePipeWriters(stdoutLog *io.PipeWriter, stderrLog *io.PipeWriter) { + if stdoutLog != nil { + stdoutLog.Close() + } + if stderrLog != nil { + stderrLog.Close() + } +} + func (t *basicTaskBase) ensureBasicTaskKilled() (err error) { if t.taskCmd == nil { return nil From a5991508df117ef244ff97141d53f102c64dad9e Mon Sep 17 00:00:00 2001 From: Piotr Konopka Date: Wed, 10 Dec 2025 14:12:49 +0100 Subject: [PATCH 05/16] refactor logging in controllabletask we reduce boiler-plate by having a common set of log fields that are defined once. infologger.level constant is used instead of "level". "cmd" and "command" log fields were unified to "command". --- executor/executable/controllabletask.go | 481 ++++++++---------------- 1 file changed, 162 insertions(+), 319 deletions(-) diff --git a/executor/executable/controllabletask.go b/executor/executable/controllabletask.go index ff8d162f0..6f085655b 100644 --- a/executor/executable/controllabletask.go +++ b/executor/executable/controllabletask.go @@ -71,48 +71,39 @@ type CommitResponse struct { } func (t *ControllableTask) Launch() error { - log.WithFields(logrus.Fields{ + defaultLogFields := logrus.Fields{ "taskId": t.ti.TaskID.GetValue(), "taskName": t.ti.Name, - "level": infologger.IL_Devel, "partition": t.knownEnvironmentId.String(), "detector": t.knownDetector, - }).Debug("executor.ControllableTask.Launch begin") + } + + log.WithFields(defaultLogFields). + WithField(infologger.Level, infologger.IL_Devel). + Debug("executor.ControllableTask.Launch begin") launchStartTime := time.Now() defer utils.TimeTrack(launchStartTime, "executor.ControllableTask.Launch", - log.WithFields(logrus.Fields{ - "taskId": t.ti.TaskID.GetValue(), - "taskName": t.ti.Name, - "level": infologger.IL_Devel, - "partition": t.knownEnvironmentId.String(), - "detector": t.knownDetector, - })) + log.WithFields(defaultLogFields). + WithField(infologger.Level, infologger.IL_Devel)) t.pendingFinalTaskStateCh = make(chan mesos.TaskState, 1) // we use this to receive a pending status update if the task was killed taskCmd, err := prepareTaskCmd(t.Tci) if err != nil { msg := "cannot build task command" - log.WithFields(logrus.Fields{ - "id": t.ti.TaskID.Value, - "task": t.ti.Name, - "error": err, - "partition": t.knownEnvironmentId.String(), - "detector": t.knownDetector, - }). + log.WithFields(defaultLogFields). + WithError(err). Error(msg) t.sendStatus(t.knownEnvironmentId, mesos.TASK_FAILED, msg+": "+err.Error()) return err } - log.WithField("payload", string(t.ti.GetData()[:])). - WithField("task", t.ti.Name). - WithField("level", infologger.IL_Devel). - WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). + log.WithFields(defaultLogFields). + WithField("payload", string(t.ti.GetData()[:])). + WithField(infologger.Level, infologger.IL_Devel). Debug("starting task asynchronously") // We fork out into a goroutine for the actual process management. @@ -121,14 +112,10 @@ func (t *ControllableTask) Launch() error { // via channels. go func() { truncatedCmd := executorutil.TruncateCommandBeforeTheLastPipe(t.Tci.GetValue(), 500) - log.WithFields(logrus.Fields{ - "cmd": truncatedCmd, - "taskId": t.ti.TaskID.GetValue(), - "taskName": t.ti.Name, - "level": infologger.IL_Devel, - "partition": t.knownEnvironmentId.String(), - "detector": t.knownDetector, - }).Debug("executor.ControllableTask.Launch.async begin") + log.WithFields(defaultLogFields). + WithField("command", truncatedCmd). + WithField(infologger.Level, infologger.IL_Devel). + Debug("executor.ControllableTask.Launch.async begin") // Set up pipes for controlled process var errStdout, errStderr error @@ -137,36 +124,22 @@ func (t *ControllableTask) Launch() error { err = taskCmd.Start() if err != nil { - log.WithFields(logrus.Fields{ - "id": t.ti.TaskID.Value, - "task": t.ti.Name, - "error": err.Error(), - "command": truncatedCmd, - "partition": t.knownEnvironmentId.String(), - "detector": t.knownDetector, - }). + log.WithFields(defaultLogFields). + WithField("command", truncatedCmd). + WithError(err). Error("failed to run task") t.sendStatus(t.knownEnvironmentId, mesos.TASK_FAILED, err.Error()) _ = t.doTermIntKill(-taskCmd.Process.Pid) return } - log.WithField("id", t.ti.TaskID.Value). - WithField("task", t.ti.Name). - WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - Debug("task launched") + log.WithFields(defaultLogFields).Debug("task launched") utils.TimeTrack(launchStartTime, "executor.ControllableTask.Launch.async: Launch begin to taskCmd.Start() complete", - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithFields(logrus.Fields{ - "cmd": truncatedCmd, - "taskId": t.ti.TaskID.GetValue(), - "taskName": t.ti.Name, - "level": infologger.IL_Devel, - })) + log.WithFields(defaultLogFields). + WithField("command", truncatedCmd). + WithField(infologger.Level, infologger.IL_Devel)) if t.Tci.Stdout == nil { none := "none" @@ -181,10 +154,8 @@ func (t *ControllableTask) Launch() error { case "stdout": go func() { entry := log.WithPrefix("task-stdout"). - WithField("level", infologger.IL_Support). - WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithField("task", t.ti.Name). + WithFields(defaultLogFields). + WithField(infologger.Level, infologger.IL_Support). WithField("nohooks", true) writer := &logger.SafeLogrusWriter{ Entry: entry, @@ -196,10 +167,8 @@ func (t *ControllableTask) Launch() error { case "all": go func() { entry := log.WithPrefix("task-stdout"). - WithField("level", infologger.IL_Support). - WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithField("task", t.ti.Name) + WithFields(defaultLogFields). + WithField(infologger.Level, infologger.IL_Support) writer := &logger.SafeLogrusWriter{ Entry: entry, PrintFunc: entry.Debug, @@ -217,10 +186,8 @@ func (t *ControllableTask) Launch() error { case "stdout": go func() { entry := log.WithPrefix("task-stderr"). - WithField("level", infologger.IL_Support). - WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithField("task", t.ti.Name). + WithFields(defaultLogFields). + WithField(infologger.Level, infologger.IL_Support). WithField("nohooks", true) writer := &logger.SafeLogrusWriter{ Entry: entry, @@ -232,10 +199,8 @@ func (t *ControllableTask) Launch() error { case "all": go func() { entry := log.WithPrefix("task-stderr"). - WithField("level", infologger.IL_Support). - WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithField("task", t.ti.Name) + WithFields(defaultLogFields). + WithField(infologger.Level, infologger.IL_Support) writer := &logger.SafeLogrusWriter{ Entry: entry, PrintFunc: entry.Warn, @@ -249,19 +214,15 @@ func (t *ControllableTask) Launch() error { }() } - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithFields(logrus.Fields{ - "controlPort": t.Tci.ControlPort, - "controlMode": t.Tci.ControlMode.String(), - "task": t.ti.Name, - "id": t.ti.TaskID.Value, - "path": taskCmd.Path, - "argv": "[ " + strings.Join(taskCmd.Args, ", ") + " ]", - "argc": len(taskCmd.Args), - "level": infologger.IL_Devel, - }). - Debug("starting gRPC client") + log.WithFields(defaultLogFields). + WithFields(logrus.Fields{ + "controlPort": t.Tci.ControlPort, + "controlMode": t.Tci.ControlMode.String(), + "path": taskCmd.Path, + "argv": "[ " + strings.Join(taskCmd.Args, ", ") + " ]", + "argc": len(taskCmd.Args), + infologger.Level: infologger.IL_Devel, + }).Debug("starting gRPC client") controlTransport := executorcmd.ProtobufTransport for _, v := range taskCmd.Args { @@ -278,26 +239,15 @@ func (t *ControllableTask) Launch() error { t.Tci.ControlMode, controlTransport, log.WithPrefix("executorcmd"). - WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithFields(logrus.Fields{ - "id": t.ti.TaskID.Value, - "task": t.ti.Name, - "command": truncatedCmd, - }, - ), + WithFields(defaultLogFields). + WithField("command", truncatedCmd), ) if t.rpc == nil { err = errors.New("rpc client is nil") - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithFields(logrus.Fields{ - "id": t.ti.TaskID.Value, - "task": t.ti.Name, - "error": err.Error(), - "command": truncatedCmd, - }). - WithField("level", infologger.IL_Devel). + log.WithFields(defaultLogFields). + WithField("command", truncatedCmd). + WithError(err). + WithField(infologger.Level, infologger.IL_Devel). Error("could not start gRPC client") t.sendStatus(t.knownEnvironmentId, mesos.TASK_FAILED, err.Error()) @@ -308,60 +258,37 @@ func (t *ControllableTask) Launch() error { utils.TimeTrack(launchStartTime, "executor.ControllableTask.Launch.async: Launch begin to gRPC client dial success", - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithFields(logrus.Fields{ - "command": truncatedCmd, - "taskId": t.ti.TaskID.GetValue(), - "taskName": t.ti.Name, - "level": infologger.IL_Devel, - })) + log.WithFields(defaultLogFields). + WithField("command", truncatedCmd). + WithField(infologger.Level, infologger.IL_Devel)) utils.TimeTrack(rpcDialStartTime, "executor.ControllableTask.Launch.async: gRPC client dial begin to gRPC client dial success", - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithFields(logrus.Fields{ - "command": truncatedCmd, - "taskId": t.ti.TaskID.GetValue(), - "taskName": t.ti.Name, - "level": infologger.IL_Devel, - })) + log.WithFields(defaultLogFields). + WithField("command", truncatedCmd). + WithField(infologger.Level, infologger.IL_Devel)) statePollingStartTime := time.Now() elapsed := 0 * time.Second for { - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithFields(logrus.Fields{ - "id": t.ti.TaskID.Value, - "task": t.ti.Name, - "command": truncatedCmd, - "elapsed": elapsed.String(), - "level": infologger.IL_Devel, - }). + log.WithFields(defaultLogFields). + WithField("command", truncatedCmd). + WithField("elapsed", elapsed.String()). + WithField(infologger.Level, infologger.IL_Devel). Debug("polling task for IDLE state reached") response, err := t.rpc.GetState(context.TODO(), &pb.GetStateRequest{}, grpc.EmptyCallOption{}) if err != nil { log.WithError(err). - WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithFields(logrus.Fields{ - "state": response.GetState(), - "task": t.ti.Name, - "command": truncatedCmd, - }). + WithFields(defaultLogFields). + WithField("state", response.GetState()). + WithField("command", truncatedCmd). Info("cannot query task status") } else { - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithFields(logrus.Fields{ - "state": response.GetState(), - "task": t.ti.Name, - "command": truncatedCmd, - "level": infologger.IL_Devel, - }). + log.WithFields(defaultLogFields). + WithField("state", response.GetState()). + WithField("command", truncatedCmd). + WithField(infologger.Level, infologger.IL_Devel). Debug("task status queried") t.knownPid = int(response.GetPid()) } @@ -369,12 +296,9 @@ func (t *ControllableTask) Launch() error { reachedState := t.rpc.FromDeviceState(response.GetState()) if reachedState == "STANDBY" && err == nil { - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithField("id", t.ti.TaskID.Value). - WithField("task", t.ti.Name). + log.WithFields(defaultLogFields). WithField("command", truncatedCmd). - WithField("level", infologger.IL_Devel). + WithField(infologger.Level, infologger.IL_Devel). Debug("task running and ready for control input") break } else if reachedState == "DONE" || reachedState == "ERROR" { @@ -385,24 +309,20 @@ func (t *ControllableTask) Launch() error { // of this process, so we must rely on the PGID of the containing shell pid = -t.rpc.TaskCmd.Process.Pid } - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithField("taskId", t.ti.Name). + log.WithFields(defaultLogFields). Debug("sending SIGKILL (9) to task") _ = syscall.Kill(pid, syscall.SIGKILL) _ = stdoutIn.Close() _ = stderrIn.Close() - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithField("task", t.ti.Name).Debug("task killed") + log.WithFields(defaultLogFields). + Debug("task killed") t.sendStatus(t.knownEnvironmentId, mesos.TASK_FAILED, "task reached wrong state on startup") return } else if elapsed >= startupTimeout { err = errors.New("timeout while waiting for task startup") - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithField("task", t.ti.Name).Error(err.Error()) + log.WithFields(defaultLogFields). + Error(err.Error()) t.sendStatus(t.knownEnvironmentId, mesos.TASK_FAILED, err.Error()) _ = t.rpc.Close() t.rpc = nil @@ -412,9 +332,7 @@ func (t *ControllableTask) Launch() error { return } else { - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithField("task", t.ti.Name). + log.WithFields(defaultLogFields). WithField("command", truncatedCmd). Debugf("task not ready yet, waiting %s", startupPollingInterval.String()) time.Sleep(startupPollingInterval) @@ -424,33 +342,21 @@ func (t *ControllableTask) Launch() error { utils.TimeTrack(launchStartTime, "executor.ControllableTask.Launch.async: Launch begin to gRPC state polling done", - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithFields(logrus.Fields{ - "command": truncatedCmd, - "taskId": t.ti.TaskID.GetValue(), - "taskName": t.ti.Name, - "level": infologger.IL_Devel, - })) + log.WithFields(defaultLogFields). + WithField("command", truncatedCmd). + WithField(infologger.Level, infologger.IL_Devel)) utils.TimeTrack(statePollingStartTime, "executor.ControllableTask.Launch.async: gRPC state polling begin to gRPC state polling done", - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithFields(logrus.Fields{ - "command": truncatedCmd, - "taskId": t.ti.TaskID.GetValue(), - "taskName": t.ti.Name, - "level": infologger.IL_Devel, - })) + log.WithFields(defaultLogFields). + WithField("command", truncatedCmd). + WithField(infologger.Level, infologger.IL_Devel)) // Set up event stream from task esc, err := t.rpc.EventStream(context.TODO(), &pb.EventStreamRequest{}, grpc.EmptyCallOption{}) if err != nil { - log.WithField("task", t.ti.Name). + log.WithFields(defaultLogFields). WithError(err). - WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). Error("cannot set up event stream from task") t.sendStatus(t.knownEnvironmentId, mesos.TASK_FAILED, err.Error()) _ = t.rpc.Close() @@ -465,22 +371,15 @@ func (t *ControllableTask) Launch() error { jsonEvent, err := json.Marshal(taskMessage) if err != nil { - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithField("taskId", t.ti.TaskID.GetValue()). - WithField("taskName", t.ti.Name). + log.WithFields(defaultLogFields). WithError(err). Warning("error marshaling message") } else { t.sendMessage(jsonEvent) - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithFields(logrus.Fields{ - "command": truncatedCmd, - "taskId": t.ti.TaskID.GetValue(), - "taskName": t.ti.Name, - "level": infologger.IL_Devel, - }).Debug("executor.ControllableTask.Launch.async: TASK_RUNNING sent back to core") + log.WithFields(defaultLogFields). + WithField("command", truncatedCmd). + WithField(infologger.Level, infologger.IL_Devel). + Debug("executor.ControllableTask.Launch.async: TASK_RUNNING sent back to core") } // Process events from task in yet another goroutine @@ -492,32 +391,23 @@ func (t *ControllableTask) Launch() error { } for { if t.rpc == nil { - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithField("taskId", deo.TaskId.GetValue()). - WithField("taskName", t.ti.Name). + log.WithFields(defaultLogFields). WithError(err). Debug("event stream done") break } esr, err := esc.Recv() if err == io.EOF { - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithField("taskId", deo.TaskId.GetValue()). - WithField("taskName", t.ti.Name). + log.WithFields(defaultLogFields). WithError(err). Debug("event stream EOF") break } if err != nil { - log.WithError(err). - WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). + log.WithFields(defaultLogFields). WithField("errorType", reflect.TypeOf(err)). - WithField("level", infologger.IL_Devel). - WithField("taskId", deo.TaskId.GetValue()). - WithField("taskName", t.ti.Name). + WithField(infologger.Level, infologger.IL_Devel). + WithError(err). Warning("error receiving event") if status.Code(err) == codes.Unavailable { break @@ -528,29 +418,18 @@ func (t *ControllableTask) Launch() error { deviceEvent := event.NewDeviceEvent(deo, ev.GetType()) if deviceEvent == nil { - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithField("taskId", deo.TaskId.GetValue()). - WithField("taskName", t.ti.Name). + log.WithFields(defaultLogFields). Debug("nil DeviceEvent received (NULL_DEVICE_EVENT) - closing stream") break } else { - taskId := deo.TaskId.Value - if deviceEvent.GetType() == pb.DeviceEventType_END_OF_STREAM { - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithField("taskId", taskId). - WithField("taskName", t.ti.Name). + log.WithFields(defaultLogFields). WithField("taskPid", t.knownPid). Debug("END_OF_STREAM DeviceEvent received - notifying environment") } else if ev.GetType() == pb.DeviceEventType_TASK_INTERNAL_ERROR { - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithField("taskId", taskId). - WithField("taskName", t.ti.Name). + log.WithFields(defaultLogFields). WithField("taskPid", t.knownPid). - WithField("level", infologger.IL_Support). + WithField(infologger.Level, infologger.IL_Support). Warningf("task transitioned to ERROR on its own - notifying environment") } } @@ -562,31 +441,21 @@ func (t *ControllableTask) Launch() error { err = taskCmd.Wait() // ^ when this unblocks, the task is done - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithFields(logrus.Fields{ - "id": t.ti.TaskID.Value, - "task": t.ti.Name, - "command": truncatedCmd, - "level": infologger.IL_Devel, - }).Debug("task done (taskCmd.Wait unblocks), preparing final update") + log.WithFields(defaultLogFields). + WithField("command", truncatedCmd). + WithField(infologger.Level, infologger.IL_Devel). + Debug("task done (taskCmd.Wait unblocks), preparing final update") pendingState := mesos.TASK_FINISHED if err != nil { taskClassName, _ := utils.ExtractTaskClassName(t.ti.Name) - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithField("level", infologger.IL_Ops). + log.WithFields(defaultLogFields). + WithField(infologger.Level, infologger.IL_Ops). Errorf("task '%s' terminated with error: %s", utils.TrimJitPrefix(taskClassName), err.Error()) - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithFields(logrus.Fields{ - "id": t.ti.TaskID.Value, - "task": t.ti.Name, - "command": truncatedCmd, - "error": err.Error(), - "level": infologger.IL_Devel, - }). + log.WithFields(defaultLogFields). + WithField("command", truncatedCmd). + WithField(infologger.Level, infologger.IL_Devel). + WithError(err). Error("task terminated with error (details):") pendingState = mesos.TASK_FAILED } @@ -599,42 +468,27 @@ func (t *ControllableTask) Launch() error { if t.rpc != nil { _ = t.rpc.Close() // NOTE: might return non-nil error, but we don't care much - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithField("taskId", t.ti.TaskID.GetValue()). - WithField("taskName", t.ti.Name). + log.WithFields(defaultLogFields). Debug("rpc client closed") t.rpc = nil - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithField("taskId", t.ti.TaskID.GetValue()). - WithField("taskName", t.ti.Name). + log.WithFields(defaultLogFields). Debug("rpc client removed") } if errStdout != nil || errStderr != nil { - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithFields(logrus.Fields{ - "errStderr": errStderr, - "errStdout": errStdout, - "id": t.ti.TaskID.Value, - "task": t.ti.Name, - "command": truncatedCmd, - "level": infologger.IL_Devel, - }). + log.WithFields(defaultLogFields). + WithField("errStderr", errStderr). + WithField("errStdout", errStdout). + WithField("command", truncatedCmd). + WithField(infologger.Level, infologger.IL_Devel). Warning("failed to capture stdout or stderr of task") } t.sendStatus(t.knownEnvironmentId, pendingState, "") }() - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithFields(logrus.Fields{ - "task": t.ti.Name, - "level": infologger.IL_Devel, - }). + log.WithFields(defaultLogFields). + WithField(infologger.Level, infologger.IL_Devel). Debug("gRPC client starting, handler forked: executor.ControllableTask.Launch end") return nil } @@ -662,6 +516,13 @@ func (t *ControllableTask) Transition(cmd *executorcmd.ExecutorCommand_Transitio } func (t *ControllableTask) Kill() error { + defaultLogFields := logrus.Fields{ + "taskId": t.ti.TaskID.GetValue(), + "taskName": t.ti.Name, + "partition": t.knownEnvironmentId.String(), + "detector": t.knownDetector, + } + var ( pid = 0 reachedState = "UNKNOWN" // FIXME: should be LAUNCHING or similar @@ -670,11 +531,9 @@ func (t *ControllableTask) Kill() error { defer cancel() response, err := t.rpc.GetState(cxt, &pb.GetStateRequest{}, grpc.EmptyCallOption{}) if err == nil { // we successfully got the state from the task - log.WithField("nativeState", response.GetState()). - WithField("taskId", t.ti.GetTaskID()). - WithField("level", infologger.IL_Devel). - WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). + log.WithFields(defaultLogFields). + WithField("nativeState", response.GetState()). + WithField(infologger.Level, infologger.IL_Devel). Debug("task status queried for upcoming soft kill") // NOTE: we acquire the transitioner-dependent STANDBY equivalent state @@ -717,14 +576,13 @@ func (t *ControllableTask) Kill() error { for reachedState != "DONE" { cmd := nextTransition(reachedState) - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). + log.WithFields(defaultLogFields). WithFields(logrus.Fields{ - "evt": cmd.Event, - "src": cmd.Source, - "dst": cmd.Destination, - "targetList": cmd.TargetList, - "level": infologger.IL_Devel, + "evt": cmd.Event, + "src": cmd.Source, + "dst": cmd.Destination, + "targetList": cmd.TargetList, + infologger.Level: infologger.IL_Devel, }). Debug("state DONE not reached, about to commit transition") @@ -741,10 +599,8 @@ func (t *ControllableTask) Kill() error { select { case commitResponse = <-commitDone: case <-time.After(KILL_TRANSITION_TIMEOUT): - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithField("task", t.ti.TaskID.Value). - WithField("level", infologger.IL_Devel). + log.WithFields(defaultLogFields). + WithField(infologger.Level, infologger.IL_Devel). Warn("teardown transition sequence timed out") } // timeout we should break @@ -752,29 +608,23 @@ func (t *ControllableTask) Kill() error { break } - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). + log.WithFields(defaultLogFields). WithField("newState", commitResponse.newState). WithError(commitResponse.transitionError). - WithField("task", t.ti.TaskID.Value). - WithField("level", infologger.IL_Devel). + WithField(infologger.Level, infologger.IL_Devel). Debug("transition committed") if commitResponse.transitionError != nil || len(cmd.Event) == 0 { - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). + log.WithFields(defaultLogFields). WithError(commitResponse.transitionError). - WithField("task", t.ti.TaskID.Value). - WithField("level", infologger.IL_Devel). + WithField(infologger.Level, infologger.IL_Devel). Warn("teardown transition sequence error") break } reachedState = commitResponse.newState } - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithField("task", t.ti.TaskID.Value). - WithField("level", infologger.IL_Devel). + log.WithFields(defaultLogFields). + WithField(infologger.Level, infologger.IL_Devel). Debug("teardown transition sequence done") pid = int(response.GetPid()) if pid == 0 { @@ -785,10 +635,8 @@ func (t *ControllableTask) Kill() error { // If GetState didn't succeed during this Kill code path, but might still have // at some earlier point during the lifetime of this task. // Either way, we might or might not have the true PID. - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). + log.WithFields(defaultLogFields). WithError(err). - WithField("taskId", t.ti.GetTaskID()). Warn("cannot query task status for graceful process termination") pid = t.knownPid if pid == 0 { @@ -802,9 +650,8 @@ func (t *ControllableTask) Kill() error { // terminate the shell that is wrapping the command, so we avoid using // negative PID is all other cases in order to allow FairMQ cleanup to // run. - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithError(err).WithField("taskId", t.ti.GetTaskID()). + log.WithFields(defaultLogFields). + WithError(err). Warn("task PID not known from task, using containing shell PGID") } } @@ -813,16 +660,12 @@ func (t *ControllableTask) Kill() error { t.rpc = nil if reachedState == "DONE" { - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithField("taskId", t.ti.TaskID.Value). + log.WithFields(defaultLogFields). Debugf("task reached DONE, will wait %.1fs before terminating it", DONE_TIMEOUT.Seconds()) t.pendingFinalTaskStateCh <- mesos.TASK_FINISHED time.Sleep(DONE_TIMEOUT) } else { // something went wrong - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithField("taskId", t.ti.TaskID.Value). + log.WithFields(defaultLogFields). Debug("task died already or will be killed soon") t.pendingFinalTaskStateCh <- mesos.TASK_KILLED } @@ -830,25 +673,26 @@ func (t *ControllableTask) Kill() error { if pidExists(pid) { return t.doTermIntKill(pid) } else { - log.WithField("taskId", t.ti.GetTaskID()). - WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). + log.WithFields(defaultLogFields). Debugf("task terminated on its own") return nil } } func (t *ControllableTask) doKill9(pid int) error { - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithField("taskId", t.ti.GetTaskID()). + defaultLogFields := logrus.Fields{ + "taskId": t.ti.TaskID.GetValue(), + "taskName": t.ti.Name, + "partition": t.knownEnvironmentId.String(), + "detector": t.knownDetector, + } + + log.WithFields(defaultLogFields). Debug("sending SIGKILL (9) to task") killErr := syscall.Kill(pid, syscall.SIGKILL) if killErr != nil { - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). + log.WithFields(defaultLogFields). WithError(killErr). - WithField("taskId", t.ti.GetTaskID()). Warning("task SIGKILL failed") } @@ -856,18 +700,21 @@ func (t *ControllableTask) doKill9(pid int) error { } func (t *ControllableTask) doTermIntKill(pid int) error { + defaultLogFields := logrus.Fields{ + "taskId": t.ti.TaskID.GetValue(), + "taskName": t.ti.Name, + "partition": t.knownEnvironmentId.String(), + "detector": t.knownDetector, + } + killErrCh := make(chan error) go func() { - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithField("taskId", t.ti.GetTaskID()). + log.WithFields(defaultLogFields). Debug("sending SIGTERM (15) to task") err := syscall.Kill(pid, syscall.SIGTERM) if err != nil { - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). + log.WithFields(defaultLogFields). WithError(err). - WithField("taskId", t.ti.GetTaskID()). Warning("task SIGTERM failed") } killErrCh <- err @@ -882,16 +729,12 @@ func (t *ControllableTask) doTermIntKill(pid int) error { if pidExists(pid) { // SIGINT for the "Waiting for graceful device shutdown. // Hit Ctrl-C again to abort immediately" message. - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithField("taskId", t.ti.GetTaskID()). + log.WithFields(defaultLogFields). Debug("sending SIGINT (2) to task") killErr = syscall.Kill(pid, syscall.SIGINT) if killErr != nil { - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). + log.WithFields(defaultLogFields). WithError(killErr). - WithField("taskId", t.ti.GetTaskID()). Warning("task SIGINT failed") } time.Sleep(SIGINT_TIMEOUT) From 232fc2124ab90f9f699d1502a327813c9373b690 Mon Sep 17 00:00:00 2001 From: Piotr Konopka Date: Thu, 11 Dec 2025 12:43:33 +0100 Subject: [PATCH 06/16] Refactor controllable task Launch Launch is broken into smaller pieces for better readability. Some minor error handling bugs are automatically fixed on the occasion and some more are marked with fixme and will be taken care of in separate commits. --- executor/executable/controllabletask.go | 671 +++++++++++++----------- 1 file changed, 351 insertions(+), 320 deletions(-) diff --git a/executor/executable/controllabletask.go b/executor/executable/controllabletask.go index 6f085655b..72bbf7374 100644 --- a/executor/executable/controllabletask.go +++ b/executor/executable/controllabletask.go @@ -29,6 +29,7 @@ import ( "encoding/json" "errors" "io" + "os/exec" "reflect" "strings" "syscall" @@ -111,108 +112,54 @@ func (t *ControllableTask) Launch() error { // Anything in the following goroutine must not touch *internalState, except // via channels. go func() { - truncatedCmd := executorutil.TruncateCommandBeforeTheLastPipe(t.Tci.GetValue(), 500) - log.WithFields(defaultLogFields). - WithField("command", truncatedCmd). - WithField(infologger.Level, infologger.IL_Devel). - Debug("executor.ControllableTask.Launch.async begin") + t.doLaunchTask(taskCmd, launchStartTime) + }() - // Set up pipes for controlled process - var errStdout, errStderr error - stdoutIn, _ := taskCmd.StdoutPipe() - stderrIn, _ := taskCmd.StderrPipe() + log.WithFields(defaultLogFields). + WithField(infologger.Level, infologger.IL_Devel). + Debug("gRPC client starting, handler forked: executor.ControllableTask.Launch end") + return nil +} - err = taskCmd.Start() - if err != nil { - log.WithFields(defaultLogFields). - WithField("command", truncatedCmd). - WithError(err). - Error("failed to run task") +func (t *ControllableTask) doLaunchTask(taskCmd *exec.Cmd, launchStartTime time.Time) { + defaultLogFields := logrus.Fields{ + "taskId": t.ti.TaskID.GetValue(), + "taskName": t.ti.Name, + "partition": t.knownEnvironmentId.String(), + "detector": t.knownDetector, + } + truncatedCmd := executorutil.TruncateCommandBeforeTheLastPipe(t.Tci.GetValue(), 500) - t.sendStatus(t.knownEnvironmentId, mesos.TASK_FAILED, err.Error()) - _ = t.doTermIntKill(-taskCmd.Process.Pid) - return - } - log.WithFields(defaultLogFields).Debug("task launched") + log.WithFields(defaultLogFields). + WithField("command", truncatedCmd). + WithField(infologger.Level, infologger.IL_Devel). + Debug("executor.ControllableTask.Launch.async begin") - utils.TimeTrack(launchStartTime, - "executor.ControllableTask.Launch.async: Launch begin to taskCmd.Start() complete", - log.WithFields(defaultLogFields). - WithField("command", truncatedCmd). - WithField(infologger.Level, infologger.IL_Devel)) + // Set up pipes for controlled process. They have to be retrieved before starting the task. + stdoutIn, _ := taskCmd.StdoutPipe() + stderrIn, _ := taskCmd.StderrPipe() - if t.Tci.Stdout == nil { - none := "none" - t.Tci.Stdout = &none - } - if t.Tci.Stderr == nil { - none := "none" - t.Tci.Stderr = &none - } + err := taskCmd.Start() + if err != nil { + log.WithFields(defaultLogFields). + WithField("command", truncatedCmd). + WithError(err). + Error("failed to run task") - switch *t.Tci.Stdout { - case "stdout": - go func() { - entry := log.WithPrefix("task-stdout"). - WithFields(defaultLogFields). - WithField(infologger.Level, infologger.IL_Support). - WithField("nohooks", true) - writer := &logger.SafeLogrusWriter{ - Entry: entry, - PrintFunc: entry.Debug, - } - _, errStdout = io.Copy(writer, stdoutIn) - writer.Flush() - }() - case "all": - go func() { - entry := log.WithPrefix("task-stdout"). - WithFields(defaultLogFields). - WithField(infologger.Level, infologger.IL_Support) - writer := &logger.SafeLogrusWriter{ - Entry: entry, - PrintFunc: entry.Debug, - } - _, errStdout = io.Copy(writer, stdoutIn) - writer.Flush() - }() - default: - go func() { - _, errStdout = io.Copy(io.Discard, stdoutIn) - }() - } + t.sendStatus(t.knownEnvironmentId, mesos.TASK_FAILED, err.Error()) + // fixme: i confirmed on staging, that's a nil access! taskCmd.Process is not set if Start() fails + _ = t.doTermIntKill(-taskCmd.Process.Pid) + // fixme: shouldn't we also close pipes, as we do in some other error cases later? + return + } + log.WithFields(defaultLogFields).Debug("task launched") - switch *t.Tci.Stderr { - case "stdout": - go func() { - entry := log.WithPrefix("task-stderr"). - WithFields(defaultLogFields). - WithField(infologger.Level, infologger.IL_Support). - WithField("nohooks", true) - writer := &logger.SafeLogrusWriter{ - Entry: entry, - PrintFunc: entry.Warn, - } - _, errStderr = io.Copy(writer, stderrIn) - writer.Flush() - }() - case "all": - go func() { - entry := log.WithPrefix("task-stderr"). - WithFields(defaultLogFields). - WithField(infologger.Level, infologger.IL_Support) - writer := &logger.SafeLogrusWriter{ - Entry: entry, - PrintFunc: entry.Warn, - } - _, errStderr = io.Copy(writer, stderrIn) - writer.Flush() - }() - default: - go func() { - _, errStderr = io.Copy(io.Discard, stderrIn) - }() - } + utils.TimeTrack(launchStartTime, + "executor.ControllableTask.Launch.async: Launch begin to taskCmd.Start() complete", + log.WithFields(defaultLogFields). + WithField(infologger.Level, infologger.IL_Devel)) + + t.initTaskStdLogging(stdoutIn, stderrIn) log.WithFields(defaultLogFields). WithFields(logrus.Fields{ @@ -224,273 +171,357 @@ func (t *ControllableTask) Launch() error { infologger.Level: infologger.IL_Devel, }).Debug("starting gRPC client") - controlTransport := executorcmd.ProtobufTransport - for _, v := range taskCmd.Args { - if strings.Contains(v, "-P OCClite") { - controlTransport = executorcmd.JsonTransport - break - } + controlTransport := executorcmd.ProtobufTransport + for _, v := range taskCmd.Args { + if strings.Contains(v, "-P OCClite") { + controlTransport = executorcmd.JsonTransport + break } + } - rpcDialStartTime := time.Now() + rpcDialStartTime := time.Now() + t.rpc = executorcmd.NewClient( + t.Tci.ControlPort, + t.Tci.ControlMode, + controlTransport, + log.WithPrefix("executorcmd"). + WithFields(defaultLogFields), + ) + if t.rpc == nil { + err = errors.New("rpc client is nil") + log.WithFields(defaultLogFields). + WithField("command", truncatedCmd). + WithError(err). + WithField(infologger.Level, infologger.IL_Devel). + Error("could not start gRPC client") - t.rpc = executorcmd.NewClient( - t.Tci.ControlPort, - t.Tci.ControlMode, - controlTransport, - log.WithPrefix("executorcmd"). - WithFields(defaultLogFields). - WithField("command", truncatedCmd), - ) - if t.rpc == nil { - err = errors.New("rpc client is nil") - log.WithFields(defaultLogFields). - WithField("command", truncatedCmd). - WithError(err). - WithField(infologger.Level, infologger.IL_Devel). - Error("could not start gRPC client") + t.sendStatus(t.knownEnvironmentId, mesos.TASK_FAILED, err.Error()) + _ = t.doTermIntKill(-taskCmd.Process.Pid) + return + } + t.rpc.TaskCmd = taskCmd - t.sendStatus(t.knownEnvironmentId, mesos.TASK_FAILED, err.Error()) - _ = t.doTermIntKill(-taskCmd.Process.Pid) - return + utils.TimeTrack(launchStartTime, + "executor.ControllableTask.Launch.async: Launch begin to gRPC client dial success", + log.WithFields(defaultLogFields). + WithField(infologger.Level, infologger.IL_Devel)) + + utils.TimeTrack(rpcDialStartTime, + "executor.ControllableTask.Launch.async: gRPC client dial begin to gRPC client dial success", + log.WithFields(defaultLogFields). + WithField(infologger.Level, infologger.IL_Devel)) + + err = t.pollTaskForStandbyState() + if err != nil { + t.sendStatus(t.knownEnvironmentId, mesos.TASK_FAILED, err.Error()) + + _ = t.rpc.Close() + t.rpc = nil + + pid := t.knownPid + if pid == 0 { + // The pid was never known through a successful `GetState` in the lifetime + // of this process, so we must rely on the PGID of the containing shell + pid = -taskCmd.Process.Pid } - t.rpc.TaskCmd = taskCmd + log.WithFields(defaultLogFields). + Debug("sending SIGKILL (9) to task") + _ = syscall.Kill(pid, syscall.SIGKILL) // fixme: not sure why we do it differently than elsewhere (doTermIntKill) + _ = stdoutIn.Close() + _ = stderrIn.Close() - utils.TimeTrack(launchStartTime, - "executor.ControllableTask.Launch.async: Launch begin to gRPC client dial success", - log.WithFields(defaultLogFields). - WithField("command", truncatedCmd). - WithField(infologger.Level, infologger.IL_Devel)) + log.WithFields(defaultLogFields). + Debug("task killed") + return + } - utils.TimeTrack(rpcDialStartTime, - "executor.ControllableTask.Launch.async: gRPC client dial begin to gRPC client dial success", - log.WithFields(defaultLogFields). - WithField("command", truncatedCmd). - WithField(infologger.Level, infologger.IL_Devel)) + utils.TimeTrack(launchStartTime, + "executor.ControllableTask.Launch.async: Launch begin to gRPC state polling done", + log.WithFields(defaultLogFields). + WithField(infologger.Level, infologger.IL_Devel)) - statePollingStartTime := time.Now() - elapsed := 0 * time.Second - for { - log.WithFields(defaultLogFields). - WithField("command", truncatedCmd). - WithField("elapsed", elapsed.String()). - WithField(infologger.Level, infologger.IL_Devel). - Debug("polling task for IDLE state reached") - - response, err := t.rpc.GetState(context.TODO(), &pb.GetStateRequest{}, grpc.EmptyCallOption{}) - if err != nil { - log.WithError(err). - WithFields(defaultLogFields). - WithField("state", response.GetState()). - WithField("command", truncatedCmd). - Info("cannot query task status") - } else { - log.WithFields(defaultLogFields). - WithField("state", response.GetState()). - WithField("command", truncatedCmd). - WithField(infologger.Level, infologger.IL_Devel). - Debug("task status queried") - t.knownPid = int(response.GetPid()) - } - // NOTE: we acquire the transitioner-dependent STANDBY equivalent state - reachedState := t.rpc.FromDeviceState(response.GetState()) + // Set up event stream from task + esc, err := t.rpc.EventStream(context.TODO(), &pb.EventStreamRequest{}, grpc.EmptyCallOption{}) + if err != nil { + log.WithFields(defaultLogFields). + WithError(err). + Error("cannot set up event stream from task") + t.sendStatus(t.knownEnvironmentId, mesos.TASK_FAILED, err.Error()) + _ = t.rpc.Close() + t.rpc = nil + // fixme: why don't we kill the task in this error case, but we do in others? + return + } - if reachedState == "STANDBY" && err == nil { - log.WithFields(defaultLogFields). - WithField("command", truncatedCmd). - WithField(infologger.Level, infologger.IL_Devel). - Debug("task running and ready for control input") - break - } else if reachedState == "DONE" || reachedState == "ERROR" { - // something went wrong, the device moved to DONE or ERROR on startup - pid := t.knownPid - if pid == 0 { - // The pid was never known through a successful `GetState` in the lifetime - // of this process, so we must rely on the PGID of the containing shell - pid = -t.rpc.TaskCmd.Process.Pid - } - log.WithFields(defaultLogFields). - Debug("sending SIGKILL (9) to task") - _ = syscall.Kill(pid, syscall.SIGKILL) - _ = stdoutIn.Close() - _ = stderrIn.Close() + // send RUNNING + t.sendStatus(t.knownEnvironmentId, mesos.TASK_RUNNING, "") + taskMessage := event.NewAnnounceTaskPIDEvent(t.ti.TaskID.GetValue(), int32(t.knownPid)) + taskMessage.SetLabels(map[string]string{"detector": t.knownDetector, "environmentId": t.knownEnvironmentId.String()}) - log.WithFields(defaultLogFields). - Debug("task killed") - t.sendStatus(t.knownEnvironmentId, mesos.TASK_FAILED, "task reached wrong state on startup") - return - } else if elapsed >= startupTimeout { - err = errors.New("timeout while waiting for task startup") - log.WithFields(defaultLogFields). - Error(err.Error()) - t.sendStatus(t.knownEnvironmentId, mesos.TASK_FAILED, err.Error()) - _ = t.rpc.Close() - t.rpc = nil + jsonEvent, err := json.Marshal(taskMessage) + if err != nil { + log.WithFields(defaultLogFields). + WithError(err). + Warning("error marshaling message") + } else { + t.sendMessage(jsonEvent) + log.WithFields(defaultLogFields). + WithField("command", truncatedCmd). + WithField(infologger.Level, infologger.IL_Devel). + Debug("executor.ControllableTask.Launch.async: TASK_RUNNING sent back to core") + } + + // Process events from task in yet another goroutine + go func() { + t.processEventsFromTask(esc) + }() - _ = stdoutIn.Close() - _ = stderrIn.Close() + err = taskCmd.Wait() + // ^ when this unblocks, the task is done + log.WithFields(defaultLogFields). + WithField("command", truncatedCmd). + WithField(infologger.Level, infologger.IL_Devel). + Debug("task done (taskCmd.Wait unblocks), preparing final update") - return - } else { - log.WithFields(defaultLogFields). - WithField("command", truncatedCmd). - Debugf("task not ready yet, waiting %s", startupPollingInterval.String()) - time.Sleep(startupPollingInterval) - elapsed += startupPollingInterval - } - } + pendingState := mesos.TASK_FINISHED + if err != nil { + taskClassName, _ := utils.ExtractTaskClassName(t.ti.Name) + log.WithFields(defaultLogFields). + WithField(infologger.Level, infologger.IL_Ops). + Errorf("task '%s' terminated with error: %s", utils.TrimJitPrefix(taskClassName), err.Error()) + log.WithFields(defaultLogFields). + WithField("command", truncatedCmd). + WithField(infologger.Level, infologger.IL_Devel). + WithError(err). + Error("task terminated with error (details):") + pendingState = mesos.TASK_FAILED + } - utils.TimeTrack(launchStartTime, - "executor.ControllableTask.Launch.async: Launch begin to gRPC state polling done", - log.WithFields(defaultLogFields). - WithField("command", truncatedCmd). - WithField(infologger.Level, infologger.IL_Devel)) + select { + case pending := <-t.pendingFinalTaskStateCh: + pendingState = pending + default: + } - utils.TimeTrack(statePollingStartTime, - "executor.ControllableTask.Launch.async: gRPC state polling begin to gRPC state polling done", - log.WithFields(defaultLogFields). - WithField("command", truncatedCmd). - WithField(infologger.Level, infologger.IL_Devel)) + if t.rpc != nil { + _ = t.rpc.Close() // NOTE: might return non-nil error, but we don't care much + log.WithFields(defaultLogFields). + Debug("rpc client closed") + t.rpc = nil + log.WithFields(defaultLogFields). + Debug("rpc client removed") + } - // Set up event stream from task - esc, err := t.rpc.EventStream(context.TODO(), &pb.EventStreamRequest{}, grpc.EmptyCallOption{}) - if err != nil { - log.WithFields(defaultLogFields). - WithError(err). - Error("cannot set up event stream from task") - t.sendStatus(t.knownEnvironmentId, mesos.TASK_FAILED, err.Error()) - _ = t.rpc.Close() - t.rpc = nil - return - } + t.sendStatus(t.knownEnvironmentId, pendingState, "") + return +} - // send RUNNING - t.sendStatus(t.knownEnvironmentId, mesos.TASK_RUNNING, "") - taskMessage := event.NewAnnounceTaskPIDEvent(t.ti.TaskID.GetValue(), int32(t.knownPid)) - taskMessage.SetLabels(map[string]string{"detector": t.knownDetector, "environmentId": t.knownEnvironmentId.String()}) +func (t *ControllableTask) initTaskStdLogging(stdoutIn io.ReadCloser, stderrIn io.ReadCloser) { + defaultLogFields := logrus.Fields{ + "taskId": t.ti.TaskID.GetValue(), + "taskName": t.ti.Name, + "partition": t.knownEnvironmentId.String(), + "detector": t.knownDetector, + } - jsonEvent, err := json.Marshal(taskMessage) - if err != nil { - log.WithFields(defaultLogFields). - WithError(err). - Warning("error marshaling message") - } else { - t.sendMessage(jsonEvent) + if t.Tci.Stdout == nil { + none := "none" + t.Tci.Stdout = &none + } + if t.Tci.Stderr == nil { + none := "none" + t.Tci.Stderr = &none + } + + go func() { + var errStdout error + switch *t.Tci.Stdout { + case "stdout": + entry := log.WithPrefix("task-stdout"). + WithFields(defaultLogFields). + WithField(infologger.Level, infologger.IL_Support). + WithField("nohooks", true) + writer := &logger.SafeLogrusWriter{ + Entry: entry, + PrintFunc: entry.Debug, + } + _, errStdout = io.Copy(writer, stdoutIn) + writer.Flush() + case "all": + entry := log.WithPrefix("task-stdout"). + WithFields(defaultLogFields). + WithField(infologger.Level, infologger.IL_Support) + writer := &logger.SafeLogrusWriter{ + Entry: entry, + PrintFunc: entry.Debug, + } + _, errStdout = io.Copy(writer, stdoutIn) + writer.Flush() + default: + _, errStdout = io.Copy(io.Discard, stdoutIn) + } + if errStdout != nil { log.WithFields(defaultLogFields). - WithField("command", truncatedCmd). + WithError(errStdout). WithField(infologger.Level, infologger.IL_Devel). - Debug("executor.ControllableTask.Launch.async: TASK_RUNNING sent back to core") + Warning("failed to capture stdout of task") } + }() - // Process events from task in yet another goroutine - go func() { - deo := event.DeviceEventOrigin{ - AgentId: t.ti.AgentID, - ExecutorId: t.ti.GetExecutor().ExecutorID, - TaskId: t.ti.TaskID, + go func() { + var errStderr error + switch *t.Tci.Stderr { + case "stdout": + entry := log.WithPrefix("task-stderr"). + WithFields(defaultLogFields). + WithField(infologger.Level, infologger.IL_Support). + WithField("nohooks", true) + writer := &logger.SafeLogrusWriter{ + Entry: entry, + PrintFunc: entry.Warn, } - for { - if t.rpc == nil { - log.WithFields(defaultLogFields). - WithError(err). - Debug("event stream done") - break - } - esr, err := esc.Recv() - if err == io.EOF { - log.WithFields(defaultLogFields). - WithError(err). - Debug("event stream EOF") - break - } - if err != nil { - log.WithFields(defaultLogFields). - WithField("errorType", reflect.TypeOf(err)). - WithField(infologger.Level, infologger.IL_Devel). - WithError(err). - Warning("error receiving event") - if status.Code(err) == codes.Unavailable { - break - } - continue - } - ev := esr.GetEvent() - - deviceEvent := event.NewDeviceEvent(deo, ev.GetType()) - if deviceEvent == nil { - log.WithFields(defaultLogFields). - Debug("nil DeviceEvent received (NULL_DEVICE_EVENT) - closing stream") - break - } else { - if deviceEvent.GetType() == pb.DeviceEventType_END_OF_STREAM { - log.WithFields(defaultLogFields). - WithField("taskPid", t.knownPid). - Debug("END_OF_STREAM DeviceEvent received - notifying environment") - } else if ev.GetType() == pb.DeviceEventType_TASK_INTERNAL_ERROR { - log.WithFields(defaultLogFields). - WithField("taskPid", t.knownPid). - WithField(infologger.Level, infologger.IL_Support). - Warningf("task transitioned to ERROR on its own - notifying environment") - } - } - deviceEvent.SetLabels(map[string]string{"detector": t.knownDetector, "environmentId": t.knownEnvironmentId.String()}) - - t.sendDeviceEvent(t.knownEnvironmentId, deviceEvent) + _, errStderr = io.Copy(writer, stderrIn) + writer.Flush() + case "all": + entry := log.WithPrefix("task-stderr"). + WithFields(defaultLogFields). + WithField(infologger.Level, infologger.IL_Support) + writer := &logger.SafeLogrusWriter{ + Entry: entry, + PrintFunc: entry.Warn, } - }() + _, errStderr = io.Copy(writer, stderrIn) + writer.Flush() + default: + _, errStderr = io.Copy(io.Discard, stderrIn) + } + if errStderr != nil { + log.WithFields(defaultLogFields). + WithError(errStderr). + WithField(infologger.Level, infologger.IL_Devel). + Warning("failed to capture stderr of task") + } + }() +} - err = taskCmd.Wait() - // ^ when this unblocks, the task is done +func (t *ControllableTask) pollTaskForStandbyState() error { + defaultLogFields := logrus.Fields{ + "taskId": t.ti.TaskID.GetValue(), + "taskName": t.ti.Name, + "partition": t.knownEnvironmentId.String(), + "detector": t.knownDetector, + } + statePollingStartTime := time.Now() + elapsed := 0 * time.Second + for { log.WithFields(defaultLogFields). - WithField("command", truncatedCmd). + WithField("elapsed", elapsed.String()). WithField(infologger.Level, infologger.IL_Devel). - Debug("task done (taskCmd.Wait unblocks), preparing final update") + Debug("polling task for STANDBY state reached") - pendingState := mesos.TASK_FINISHED + response, err := t.rpc.GetState(context.TODO(), &pb.GetStateRequest{}, grpc.EmptyCallOption{}) if err != nil { - taskClassName, _ := utils.ExtractTaskClassName(t.ti.Name) - log.WithFields(defaultLogFields). - WithField(infologger.Level, infologger.IL_Ops). - Errorf("task '%s' terminated with error: %s", utils.TrimJitPrefix(taskClassName), err.Error()) + log.WithError(err). + WithFields(defaultLogFields). + WithField("state", response.GetState()). + Info("cannot query task status") + } else { log.WithFields(defaultLogFields). - WithField("command", truncatedCmd). + WithField("state", response.GetState()). WithField(infologger.Level, infologger.IL_Devel). - WithError(err). - Error("task terminated with error (details):") - pendingState = mesos.TASK_FAILED + Debug("task status queried") + t.knownPid = int(response.GetPid()) } + // NOTE: we acquire the transitioner-dependent STANDBY equivalent state + // fixme: that's a possible nil access there, because we do not "continue" on error + reachedState := t.rpc.FromDeviceState(response.GetState()) - select { - case pending := <-t.pendingFinalTaskStateCh: - pendingState = pending - default: + if reachedState == "STANDBY" && err == nil { + log.WithFields(defaultLogFields). + WithField(infologger.Level, infologger.IL_Devel). + Debug("task running and ready for control input") + break + } else if reachedState == "DONE" || reachedState == "ERROR" { + // something went wrong, the device moved to DONE or ERROR on startup + return errors.New("task reached wrong state on startup") + } else if elapsed >= startupTimeout { + return errors.New("timeout while trying to poll task") + } else { + log.WithFields(defaultLogFields). + Debugf("task not ready yet, waiting %s", startupPollingInterval.String()) + time.Sleep(startupPollingInterval) + elapsed += startupPollingInterval } + } - if t.rpc != nil { - _ = t.rpc.Close() // NOTE: might return non-nil error, but we don't care much + utils.TimeTrack(statePollingStartTime, + "executor.ControllableTask.Launch.async: gRPC state polling begin to gRPC state polling done", + log.WithFields(defaultLogFields). + WithField(infologger.Level, infologger.IL_Devel)) + return nil +} + +func (t *ControllableTask) processEventsFromTask(esc pb.Occ_EventStreamClient) { + defaultLogFields := logrus.Fields{ + "taskId": t.ti.TaskID.GetValue(), + "taskName": t.ti.Name, + "partition": t.knownEnvironmentId.String(), + "detector": t.knownDetector, + } + deo := event.DeviceEventOrigin{ + AgentId: t.ti.AgentID, + ExecutorId: t.ti.GetExecutor().ExecutorID, + TaskId: t.ti.TaskID, + } + + for { + if t.rpc == nil { log.WithFields(defaultLogFields). - Debug("rpc client closed") - t.rpc = nil + Debug("event stream done") + break + } + esr, err := esc.Recv() + if err == io.EOF { log.WithFields(defaultLogFields). - Debug("rpc client removed") + WithError(err). + Debug("event stream EOF") + break } - - if errStdout != nil || errStderr != nil { + if err != nil { log.WithFields(defaultLogFields). - WithField("errStderr", errStderr). - WithField("errStdout", errStdout). - WithField("command", truncatedCmd). + WithField("errorType", reflect.TypeOf(err)). WithField(infologger.Level, infologger.IL_Devel). - Warning("failed to capture stdout or stderr of task") + WithError(err). + Warning("error receiving event") + if status.Code(err) == codes.Unavailable { + break + } + // fixme: we also get codes.Canceled sometimes, it's probably OK and we should not complain + continue } + ev := esr.GetEvent() - t.sendStatus(t.knownEnvironmentId, pendingState, "") - }() + deviceEvent := event.NewDeviceEvent(deo, ev.GetType()) + if deviceEvent == nil { + log.WithFields(defaultLogFields). + Debug("nil DeviceEvent received (NULL_DEVICE_EVENT) - closing stream") + break + } else { + if deviceEvent.GetType() == pb.DeviceEventType_END_OF_STREAM { + log.WithFields(defaultLogFields). + WithField("taskPid", t.knownPid). + Debug("END_OF_STREAM DeviceEvent received - notifying environment") + } else if ev.GetType() == pb.DeviceEventType_TASK_INTERNAL_ERROR { + log.WithFields(defaultLogFields). + WithField("taskPid", t.knownPid). + WithField(infologger.Level, infologger.IL_Support). + Warningf("task transitioned to ERROR on its own - notifying environment") + } + } + deviceEvent.SetLabels(map[string]string{"detector": t.knownDetector, "environmentId": t.knownEnvironmentId.String()}) - log.WithFields(defaultLogFields). - WithField(infologger.Level, infologger.IL_Devel). - Debug("gRPC client starting, handler forked: executor.ControllableTask.Launch end") - return nil + t.sendDeviceEvent(t.knownEnvironmentId, deviceEvent) + } } func (t *ControllableTask) UnmarshalTransition(data []byte) (cmd *executorcmd.ExecutorCommand_Transition, err error) { From 5589d40df791ddbee225cb7d0be91d6bf70d0655 Mon Sep 17 00:00:00 2001 From: Piotr Konopka Date: Fri, 12 Dec 2025 12:01:58 +0100 Subject: [PATCH 07/16] do not attempt to kill a process which never started it's difficult to trigger such an error, but if taskCmd.Start() actually fails, it never sets taskCmd.Process, so trying to use the PID there is a nil access and causes executor to crash. --- executor/executable/controllabletask.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/executor/executable/controllabletask.go b/executor/executable/controllabletask.go index 72bbf7374..d0642e878 100644 --- a/executor/executable/controllabletask.go +++ b/executor/executable/controllabletask.go @@ -147,8 +147,6 @@ func (t *ControllableTask) doLaunchTask(taskCmd *exec.Cmd, launchStartTime time. Error("failed to run task") t.sendStatus(t.knownEnvironmentId, mesos.TASK_FAILED, err.Error()) - // fixme: i confirmed on staging, that's a nil access! taskCmd.Process is not set if Start() fails - _ = t.doTermIntKill(-taskCmd.Process.Pid) // fixme: shouldn't we also close pipes, as we do in some other error cases later? return } From 5d2bb0b3500a6975a65e5a6649181a424a6ecf34 Mon Sep 17 00:00:00 2001 From: Piotr Konopka Date: Fri, 12 Dec 2025 12:22:26 +0100 Subject: [PATCH 08/16] document that we don't need to close pipes if command fails to start --- executor/executable/controllabletask.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/executor/executable/controllabletask.go b/executor/executable/controllabletask.go index d0642e878..ea3f16c3a 100644 --- a/executor/executable/controllabletask.go +++ b/executor/executable/controllabletask.go @@ -147,7 +147,7 @@ func (t *ControllableTask) doLaunchTask(taskCmd *exec.Cmd, launchStartTime time. Error("failed to run task") t.sendStatus(t.knownEnvironmentId, mesos.TASK_FAILED, err.Error()) - // fixme: shouldn't we also close pipes, as we do in some other error cases later? + // no need to close IO pipes, Cmd.Start does it on failure return } log.WithFields(defaultLogFields).Debug("task launched") From 46636baa54ae82a85ebb034ea82e7907d4d771f3 Mon Sep 17 00:00:00 2001 From: Piotr Konopka Date: Fri, 12 Dec 2025 12:33:31 +0100 Subject: [PATCH 09/16] be gentler with terminating tasks which fail to reach STANDBY If a task fails to report STANDBY state upon startup, we now perform the standard TERM, INT, KILL sequence instead of just KILL. We also rely on Wait() to close relevant pipes and provide us with exit code. All above is moved into a dedicated method which will be reused in next fixes. --- executor/executable/controllabletask.go | 64 ++++++++++++++++++------- 1 file changed, 47 insertions(+), 17 deletions(-) diff --git a/executor/executable/controllabletask.go b/executor/executable/controllabletask.go index ea3f16c3a..557e4f378 100644 --- a/executor/executable/controllabletask.go +++ b/executor/executable/controllabletask.go @@ -211,25 +211,14 @@ func (t *ControllableTask) doLaunchTask(taskCmd *exec.Cmd, launchStartTime time. err = t.pollTaskForStandbyState() if err != nil { - t.sendStatus(t.knownEnvironmentId, mesos.TASK_FAILED, err.Error()) - - _ = t.rpc.Close() - t.rpc = nil - - pid := t.knownPid - if pid == 0 { - // The pid was never known through a successful `GetState` in the lifetime - // of this process, so we must rely on the PGID of the containing shell - pid = -taskCmd.Process.Pid - } log.WithFields(defaultLogFields). - Debug("sending SIGKILL (9) to task") - _ = syscall.Kill(pid, syscall.SIGKILL) // fixme: not sure why we do it differently than elsewhere (doTermIntKill) - _ = stdoutIn.Close() - _ = stderrIn.Close() + WithField(infologger.Level, infologger.IL_Support). + WithError(err). + Error("failed to poll task for standby state") - log.WithFields(defaultLogFields). - Debug("task killed") + t.sendStatus(t.knownEnvironmentId, mesos.TASK_FAILED, err.Error()) + + t.cleanupFailedTask(taskCmd) return } @@ -314,6 +303,47 @@ func (t *ControllableTask) doLaunchTask(taskCmd *exec.Cmd, launchStartTime time. return } +func (t *ControllableTask) cleanupFailedTask(taskCmd *exec.Cmd) { + + defaultLogFields := logrus.Fields{ + "taskId": t.ti.TaskID.GetValue(), + "taskName": t.ti.Name, + "partition": t.knownEnvironmentId.String(), + "detector": t.knownDetector, + } + + if taskCmd.Process == nil { + // task never started or was already terminated + return + } + + if t.rpc != nil { + _ = t.rpc.Close() + t.rpc = nil + } + + pid := t.knownPid + if pid == 0 { + // The pid was never known through a successful `GetState` in the lifetime + // of this process, so we must rely on the PGID of the containing shell + pid = -taskCmd.Process.Pid + } + + _ = t.doTermIntKill(-taskCmd.Process.Pid) + + err := taskCmd.Wait() + if err != nil { + log.WithFields(defaultLogFields). + WithField(infologger.Level, infologger.IL_Support). + WithError(err). + Warning("task terminated and exited with error") + } else { + log.WithFields(defaultLogFields). + WithField(infologger.Level, infologger.IL_Support). + Debug("task terminated") + } +} + func (t *ControllableTask) initTaskStdLogging(stdoutIn io.ReadCloser, stderrIn io.ReadCloser) { defaultLogFields := logrus.Fields{ "taskId": t.ti.TaskID.GetValue(), From 24dae4b4b0ae77d48f7a94b305c424aa35a27617 Mon Sep 17 00:00:00 2001 From: Piotr Konopka Date: Fri, 12 Dec 2025 14:52:35 +0100 Subject: [PATCH 10/16] be consistent in cleaning up failed controllable task --- executor/executable/controllabletask.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/executor/executable/controllabletask.go b/executor/executable/controllabletask.go index 557e4f378..e17bd167b 100644 --- a/executor/executable/controllabletask.go +++ b/executor/executable/controllabletask.go @@ -194,7 +194,8 @@ func (t *ControllableTask) doLaunchTask(taskCmd *exec.Cmd, launchStartTime time. Error("could not start gRPC client") t.sendStatus(t.knownEnvironmentId, mesos.TASK_FAILED, err.Error()) - _ = t.doTermIntKill(-taskCmd.Process.Pid) + + t.cleanupFailedTask(taskCmd) return } t.rpc.TaskCmd = taskCmd @@ -234,9 +235,7 @@ func (t *ControllableTask) doLaunchTask(taskCmd *exec.Cmd, launchStartTime time. WithError(err). Error("cannot set up event stream from task") t.sendStatus(t.knownEnvironmentId, mesos.TASK_FAILED, err.Error()) - _ = t.rpc.Close() - t.rpc = nil - // fixme: why don't we kill the task in this error case, but we do in others? + t.cleanupFailedTask(taskCmd) return } From e724c0ca4f9b0fd316c7fcf22c8c9c24c3ff2f3c Mon Sep 17 00:00:00 2001 From: Piotr Konopka Date: Fri, 12 Dec 2025 15:06:18 +0100 Subject: [PATCH 11/16] fix nil access while polling controllable task for STANDBY state --- executor/executable/controllabletask.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/executor/executable/controllabletask.go b/executor/executable/controllabletask.go index e17bd167b..efbd0df90 100644 --- a/executor/executable/controllabletask.go +++ b/executor/executable/controllabletask.go @@ -447,6 +447,7 @@ func (t *ControllableTask) pollTaskForStandbyState() error { Debug("polling task for STANDBY state reached") response, err := t.rpc.GetState(context.TODO(), &pb.GetStateRequest{}, grpc.EmptyCallOption{}) + reachedState := "UNKNOWN" if err != nil { log.WithError(err). WithFields(defaultLogFields). @@ -458,12 +459,11 @@ func (t *ControllableTask) pollTaskForStandbyState() error { WithField(infologger.Level, infologger.IL_Devel). Debug("task status queried") t.knownPid = int(response.GetPid()) + // NOTE: we acquire the transitioner-dependent STANDBY equivalent state + reachedState = t.rpc.FromDeviceState(response.GetState()) } - // NOTE: we acquire the transitioner-dependent STANDBY equivalent state - // fixme: that's a possible nil access there, because we do not "continue" on error - reachedState := t.rpc.FromDeviceState(response.GetState()) - if reachedState == "STANDBY" && err == nil { + if reachedState == "STANDBY" { log.WithFields(defaultLogFields). WithField(infologger.Level, infologger.IL_Devel). Debug("task running and ready for control input") From a10f2366dcbdec9ff00136d757a69fdf79f350d9 Mon Sep 17 00:00:00 2001 From: Piotr Konopka Date: Fri, 12 Dec 2025 16:30:53 +0100 Subject: [PATCH 12/16] Distinguish between task too slow to start and crashing on start Fixes OCTRL-1075 --- executor/executable/controllabletask.go | 32 ++++++++++++++++++------- executor/executorcmd/client.go | 6 +---- 2 files changed, 25 insertions(+), 13 deletions(-) diff --git a/executor/executable/controllabletask.go b/executor/executable/controllabletask.go index efbd0df90..c8794c7f0 100644 --- a/executor/executable/controllabletask.go +++ b/executor/executable/controllabletask.go @@ -63,6 +63,7 @@ type ControllableTask struct { taskBase rpc *executorcmd.RpcClient pendingFinalTaskStateCh chan mesos.TaskState + taskDoneCh chan error knownPid int } @@ -159,6 +160,12 @@ func (t *ControllableTask) doLaunchTask(taskCmd *exec.Cmd, launchStartTime time. t.initTaskStdLogging(stdoutIn, stderrIn) + // We start to Wait() for the result already, so we have access to ProcessState on an early failure + t.taskDoneCh = make(chan error, 1) + go func() { + t.taskDoneCh <- taskCmd.Wait() + }() + log.WithFields(defaultLogFields). WithFields(logrus.Fields{ "controlPort": t.Tci.ControlPort, @@ -186,12 +193,20 @@ func (t *ControllableTask) doLaunchTask(taskCmd *exec.Cmd, launchStartTime time. WithFields(defaultLogFields), ) if t.rpc == nil { - err = errors.New("rpc client is nil") - log.WithFields(defaultLogFields). - WithField("command", truncatedCmd). - WithError(err). - WithField(infologger.Level, infologger.IL_Devel). - Error("could not start gRPC client") + // Check if the task is still running by checking ProcessState + if taskCmd.ProcessState != nil { + err = errors.New("AliECS executor could not connect to task, likely crashed on startup") + } else { + err = errors.New("AliECS executor could not connect to task, likely took too long to start") + } + + taskClassName, _ := utils.ExtractTaskClassName(t.ti.Name) + log.WithFields(logrus.Fields{ + "task": utils.TrimJitPrefix(taskClassName), + "partition": t.knownEnvironmentId.String(), + "detector": t.knownDetector, + infologger.Level: infologger.IL_Ops, + }).Error(err.Error()) t.sendStatus(t.knownEnvironmentId, mesos.TASK_FAILED, err.Error()) @@ -262,7 +277,7 @@ func (t *ControllableTask) doLaunchTask(taskCmd *exec.Cmd, launchStartTime time. t.processEventsFromTask(esc) }() - err = taskCmd.Wait() + err = <-t.taskDoneCh // ^ when this unblocks, the task is done log.WithFields(defaultLogFields). WithField("command", truncatedCmd). @@ -330,7 +345,8 @@ func (t *ControllableTask) cleanupFailedTask(taskCmd *exec.Cmd) { _ = t.doTermIntKill(-taskCmd.Process.Pid) - err := taskCmd.Wait() + // Wait for task to finish and report the error + err := <-t.taskDoneCh if err != nil { log.WithFields(defaultLogFields). WithField(infologger.Level, infologger.IL_Support). diff --git a/executor/executorcmd/client.go b/executor/executorcmd/client.go index 738a7e521..bc9a2e7a8 100644 --- a/executor/executorcmd/client.go +++ b/executor/executorcmd/client.go @@ -80,12 +80,8 @@ func NewClient( log.WithField("error", err.Error()). WithField("endpoint", endpoint). WithField("transport", controlTransportS). - WithField("level", infologger.IL_Trace). + WithField("level", infologger.IL_Devel). Error("gRPC client can't dial") - log.WithField("error", err.Error()). - WithField("endpoint", endpoint). - WithField("level", infologger.IL_Ops). - Error("AliECS executor could not connect to task, possible crash on startup") cancel() if conn != nil { From 232d21b2be3517271478669160c141d7268aa866 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Tue, 16 Dec 2025 22:35:40 +0000 Subject: [PATCH 13/16] Bump github.com/expr-lang/expr from 1.17.0 to 1.17.7 Bumps [github.com/expr-lang/expr](https://github.com/expr-lang/expr) from 1.17.0 to 1.17.7. - [Release notes](https://github.com/expr-lang/expr/releases) - [Commits](https://github.com/expr-lang/expr/compare/v1.17.0...v1.17.7) --- updated-dependencies: - dependency-name: github.com/expr-lang/expr dependency-version: 1.17.7 dependency-type: direct:production ... Signed-off-by: dependabot[bot] --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 1fd24d1c8..90a854904 100644 --- a/go.mod +++ b/go.mod @@ -69,7 +69,7 @@ require ( require ( dario.cat/mergo v1.0.1 - github.com/expr-lang/expr v1.17.0 + github.com/expr-lang/expr v1.17.7 github.com/flosch/pongo2/v6 v6.0.0 github.com/gogo/protobuf v1.3.2 github.com/hashicorp/go-multierror v1.1.1 diff --git a/go.sum b/go.sum index a23533be8..c589d794f 100644 --- a/go.sum +++ b/go.sum @@ -61,8 +61,8 @@ github.com/emirpasic/gods v1.18.1 h1:FXtiHYKDGKCW2KzwZKx0iC0PQmdlorYgdFG9jPXJ1Bc github.com/emirpasic/gods v1.18.1/go.mod h1:8tpGGwCnJ5H4r6BWwaV6OrWmMoPhUl5jm/FMNAnJvWQ= github.com/envoyproxy/protoc-gen-validate v1.0.4 h1:gVPz/FMfvh57HdSJQyvBtF00j8JU4zdyUgIUNhlgg0A= github.com/envoyproxy/protoc-gen-validate v1.0.4/go.mod h1:qys6tmnRsYrQqIhm2bvKZH4Blx/1gTIZ2UKVY1M+Yew= -github.com/expr-lang/expr v1.17.0 h1:+vpszOyzKLQXC9VF+wA8cVA0tlA984/Wabc/1hF9Whg= -github.com/expr-lang/expr v1.17.0/go.mod h1:8/vRC7+7HBzESEqt5kKpYXxrxkr31SaO8r40VO/1IT4= +github.com/expr-lang/expr v1.17.7 h1:Q0xY/e/2aCIp8g9s/LGvMDCC5PxYlvHgDZRQ4y16JX8= +github.com/expr-lang/expr v1.17.7/go.mod h1:8/vRC7+7HBzESEqt5kKpYXxrxkr31SaO8r40VO/1IT4= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL+zU= github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk= From c66a3da44472fbad630143434c673a592cf9097d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michal=20Tich=C3=A1k?= Date: Tue, 13 Jan 2026 12:09:40 +0100 Subject: [PATCH 14/16] [coconut] fixed timestamp formatting in env list and show commands --- coconut/control/control.go | 4 ++-- coconut/control/controlutil.go | 7 +++---- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/coconut/control/control.go b/coconut/control/control.go index bc0983a8b..51b09ff15 100644 --- a/coconut/control/control.go +++ b/coconut/control/control.go @@ -252,7 +252,7 @@ func GetEnvironments(cxt context.Context, rpc *coconut.RpcClient, cmd *cobra.Com data := make([][]string, 0, 0) for _, envi := range response.GetEnvironments() { - formatted := formatTimestamp(envi.GetCreatedWhen()) + formatted := formatTimestamp(time.Millisecond*time.Duration(envi.GetCreatedWhen())) data = append(data, []string{envi.GetId(), envi.GetRootRole(), formatted, colorState(envi.GetState())}) } @@ -539,7 +539,7 @@ func ShowEnvironment(cxt context.Context, rpc *coconut.RpcClient, cmd *cobra.Com _, _ = fmt.Fprintf(o, "environment id: %s\n", env.GetId()) _, _ = fmt.Fprintf(o, "workflow template: %s\n", env.GetRootRole()) _, _ = fmt.Fprintf(o, "description: %s\n", env.GetDescription()) - _, _ = fmt.Fprintf(o, "created: %s\n", formatTimestamp(env.GetCreatedWhen())) + _, _ = fmt.Fprintf(o, "created: %s\n", formatTimestamp(time.Millisecond*time.Duration(env.GetCreatedWhen()))) _, _ = fmt.Fprintf(o, "state: %s\n", colorState(env.GetState())) if currentTransition := env.GetCurrentTransition(); len(currentTransition) != 0 { _, _ = fmt.Fprintf(o, "transition: %s\n", currentTransition) diff --git a/coconut/control/controlutil.go b/coconut/control/controlutil.go index 7f6642e0b..5d85a5906 100644 --- a/coconut/control/controlutil.go +++ b/coconut/control/controlutil.go @@ -178,10 +178,9 @@ func drawTableShortTaskInfos(tasks []*pb.ShortTaskInfo, headers []string, linePr table.Render() } -func formatTimestamp(int64timestamp int64) string { - timestamp := time.Unix(0, int64timestamp) - formatted := timestamp.Local().Format("2006-01-02 15:04:05 MST") - return formatted +func formatTimestamp(timestamp time.Duration) string { + time := time.Unix(int64(timestamp.Seconds()), 0) + return time.Local().Format("2006-01-02 15:04:05 MST") } func formatNumber(numberOfMachines int32) string { From 5e79b372e0da5f23fbd78a900b1fa91fb18a1081 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michal=20Tich=C3=A1k?= Date: Tue, 13 Jan 2026 14:29:41 +0100 Subject: [PATCH 15/16] [build] Added debug build possibility to Makefile --- Makefile | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/Makefile b/Makefile index f59740768..88a426438 100644 --- a/Makefile +++ b/Makefile @@ -81,6 +81,11 @@ PROD :=-X=$(REPOPATH)/common/product EXTLDFLAGS :="-static" LDFLAGS=-ldflags "-extldflags $(EXTLDFLAGS) $(PROD).VERSION_MAJOR=$(VERSION_MAJOR) $(PROD).VERSION_MINOR=$(VERSION_MINOR) $(PROD).VERSION_PATCH=$(VERSION_PATCH) $(PROD).BUILD=$(BUILD)" -tags osusergo,netgo +# To invoke debug build you need to pass DEBUG variable with any value, eg: DEBUG=1 make WHAT=... +ifdef DEBUG + GCFLAGS += -gcflags="all=-N -l" +endif + # We expect to find the protoc-gen-go executable in $GOPATH/bin GOPATH := $(shell go env GOPATH) GOPROTOCPATH=$(ROOT_DIR)/tools/protoc-gen-go @@ -124,7 +129,7 @@ $(WHAT): validate-go-version # @echo -e "WHAT_$@_BUILD_FLAGS $(WHAT_$@_BUILD_FLAGS)" @echo -e "\033[1;33mgo build -mod=vendor\033[0m ./cmd/$@ \033[1;33m==>\033[0m \033[1;34m./bin/$@\033[0m" # @echo ${PWD} - @$(WHAT_$@_BUILD_FLAGS) go build -mod=vendor $(VERBOSE_$(V)) -o bin/$@ $(LDFLAGS) ./cmd/$@ + @$(WHAT_$@_BUILD_FLAGS) go build -mod=vendor $(VERBOSE_$(V)) $(GCFLAGS) -o bin/$@ $(LDFLAGS) ./cmd/$@ # special case: if the current WHAT is o2-aliecs-executor, also copy over the shmcleaner script @if [ $@ = "o2-aliecs-executor" ]; then \ From f03724f0c786ac0a50a6bf3e6279a37131a430eb Mon Sep 17 00:00:00 2001 From: Piotr Konopka Date: Thu, 15 Jan 2026 09:42:01 +0100 Subject: [PATCH 16/16] Bump to v1.47.0 --- VERSION | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/VERSION b/VERSION index 6a9804ddb..8bbf43827 100644 --- a/VERSION +++ b/VERSION @@ -1,4 +1,4 @@ # GNU Make syntax VERSION_MAJOR := 1 -VERSION_MINOR := 46 -VERSION_PATCH := 1 +VERSION_MINOR := 47 +VERSION_PATCH := 0