diff --git a/Makefile b/Makefile index f59740768..88a426438 100644 --- a/Makefile +++ b/Makefile @@ -81,6 +81,11 @@ PROD :=-X=$(REPOPATH)/common/product EXTLDFLAGS :="-static" LDFLAGS=-ldflags "-extldflags $(EXTLDFLAGS) $(PROD).VERSION_MAJOR=$(VERSION_MAJOR) $(PROD).VERSION_MINOR=$(VERSION_MINOR) $(PROD).VERSION_PATCH=$(VERSION_PATCH) $(PROD).BUILD=$(BUILD)" -tags osusergo,netgo +# To invoke debug build you need to pass DEBUG variable with any value, eg: DEBUG=1 make WHAT=... +ifdef DEBUG + GCFLAGS += -gcflags="all=-N -l" +endif + # We expect to find the protoc-gen-go executable in $GOPATH/bin GOPATH := $(shell go env GOPATH) GOPROTOCPATH=$(ROOT_DIR)/tools/protoc-gen-go @@ -124,7 +129,7 @@ $(WHAT): validate-go-version # @echo -e "WHAT_$@_BUILD_FLAGS $(WHAT_$@_BUILD_FLAGS)" @echo -e "\033[1;33mgo build -mod=vendor\033[0m ./cmd/$@ \033[1;33m==>\033[0m \033[1;34m./bin/$@\033[0m" # @echo ${PWD} - @$(WHAT_$@_BUILD_FLAGS) go build -mod=vendor $(VERBOSE_$(V)) -o bin/$@ $(LDFLAGS) ./cmd/$@ + @$(WHAT_$@_BUILD_FLAGS) go build -mod=vendor $(VERBOSE_$(V)) $(GCFLAGS) -o bin/$@ $(LDFLAGS) ./cmd/$@ # special case: if the current WHAT is o2-aliecs-executor, also copy over the shmcleaner script @if [ $@ = "o2-aliecs-executor" ]; then \ diff --git a/VERSION b/VERSION index 6a9804ddb..8bbf43827 100644 --- a/VERSION +++ b/VERSION @@ -1,4 +1,4 @@ # GNU Make syntax VERSION_MAJOR := 1 -VERSION_MINOR := 46 -VERSION_PATCH := 1 +VERSION_MINOR := 47 +VERSION_PATCH := 0 diff --git a/coconut/control/control.go b/coconut/control/control.go index bc0983a8b..51b09ff15 100644 --- a/coconut/control/control.go +++ b/coconut/control/control.go @@ -252,7 +252,7 @@ func GetEnvironments(cxt context.Context, rpc *coconut.RpcClient, cmd *cobra.Com data := make([][]string, 0, 0) for _, envi := range response.GetEnvironments() { - formatted := formatTimestamp(envi.GetCreatedWhen()) + formatted := formatTimestamp(time.Millisecond*time.Duration(envi.GetCreatedWhen())) data = append(data, []string{envi.GetId(), envi.GetRootRole(), formatted, colorState(envi.GetState())}) } @@ -539,7 +539,7 @@ func ShowEnvironment(cxt context.Context, rpc *coconut.RpcClient, cmd *cobra.Com _, _ = fmt.Fprintf(o, "environment id: %s\n", env.GetId()) _, _ = fmt.Fprintf(o, "workflow template: %s\n", env.GetRootRole()) _, _ = fmt.Fprintf(o, "description: %s\n", env.GetDescription()) - _, _ = fmt.Fprintf(o, "created: %s\n", formatTimestamp(env.GetCreatedWhen())) + _, _ = fmt.Fprintf(o, "created: %s\n", formatTimestamp(time.Millisecond*time.Duration(env.GetCreatedWhen()))) _, _ = fmt.Fprintf(o, "state: %s\n", colorState(env.GetState())) if currentTransition := env.GetCurrentTransition(); len(currentTransition) != 0 { _, _ = fmt.Fprintf(o, "transition: %s\n", currentTransition) diff --git a/coconut/control/controlutil.go b/coconut/control/controlutil.go index 7f6642e0b..5d85a5906 100644 --- a/coconut/control/controlutil.go +++ b/coconut/control/controlutil.go @@ -178,10 +178,9 @@ func drawTableShortTaskInfos(tasks []*pb.ShortTaskInfo, headers []string, linePr table.Render() } -func formatTimestamp(int64timestamp int64) string { - timestamp := time.Unix(0, int64timestamp) - formatted := timestamp.Local().Format("2006-01-02 15:04:05 MST") - return formatted +func formatTimestamp(timestamp time.Duration) string { + time := time.Unix(int64(timestamp.Seconds()), 0) + return time.Local().Format("2006-01-02 15:04:05 MST") } func formatNumber(numberOfMachines int32) string { diff --git a/executor/eventloop.go b/executor/eventloop.go index 3c19ed251..5fe38d14c 100644 --- a/executor/eventloop.go +++ b/executor/eventloop.go @@ -69,6 +69,11 @@ func eventLoop(state *internalState, decoder encoding.Decoder, h events.Handler) case e := <-eventCh: log.Trace("EVENT LOOP about to handle event") err = h.HandleEvent(ctx, &e) + // if we get an error here we are stoping the eventloop before triggering another nextEventNotify + // so it won't get stuck on eventCh, errorCh + if err != nil { + break + } // Spawn a goroutine to wait for the next event go nextEventNotify(decoder, eventCh, errorCh) @@ -100,7 +105,7 @@ func sendFailedTasks(state *internalState) { delete(state.failedTasks, taskID) // If there aren't any failed and active tasks, we request to shutdown the executor. if len(state.failedTasks) == 0 && len(state.activeTasks) == 0 { - //Originally state.shouldQuit = true but we want to keep the executor running + // Originally state.shouldQuit = true but we want to keep the executor running log.WithField("executorId", state.executor.ExecutorID).Info("task failure notified, no tasks present on executor") } } diff --git a/executor/executable/basictaskcommon.go b/executor/executable/basictaskcommon.go index 166270962..068752aef 100644 --- a/executor/executable/basictaskcommon.go +++ b/executor/executable/basictaskcommon.go @@ -28,12 +28,13 @@ import ( "bytes" "encoding/json" "errors" - "github.com/AliceO2Group/Control/common/utils" "io" "os/exec" "syscall" "time" + "github.com/AliceO2Group/Control/common/utils" + "github.com/AliceO2Group/Control/common/controlmode" "github.com/AliceO2Group/Control/common/event" "github.com/AliceO2Group/Control/common/logger/infologger" @@ -73,6 +74,8 @@ func (t *basicTaskBase) startBasicTask() (err error) { var errStdout, errStderr error var stdoutBuf, stderrBuf bytes.Buffer var stdout, stderr io.Writer + // To be closed after task is done + var stdoutLog, stderrLog *io.PipeWriter if t.Tci.Stdout == nil { none := "none" @@ -85,7 +88,7 @@ func (t *basicTaskBase) startBasicTask() (err error) { switch *t.Tci.Stdout { case "stdout": - stdoutLog := log.WithPrefix("task-stdout"). + stdoutLog = log.WithPrefix("task-stdout"). WithField("level", infologger.IL_Support). WithField("partition", t.knownEnvironmentId.String()). WithField("detector", t.knownDetector). @@ -97,7 +100,7 @@ func (t *basicTaskBase) startBasicTask() (err error) { stdout = io.MultiWriter(stdoutLog, &stdoutBuf) case "all": - stdoutLog := log.WithPrefix("task-stdout"). + stdoutLog = log.WithPrefix("task-stdout"). WithField("level", infologger.IL_Support). WithField("partition", t.knownEnvironmentId.String()). WithField("detector", t.knownDetector). @@ -112,7 +115,7 @@ func (t *basicTaskBase) startBasicTask() (err error) { switch *t.Tci.Stderr { case "stdout": - stderrLog := log.WithPrefix("task-stderr"). + stderrLog = log.WithPrefix("task-stderr"). WithField("level", infologger.IL_Support). WithField("partition", t.knownEnvironmentId.String()). WithField("detector", t.knownDetector). @@ -124,7 +127,7 @@ func (t *basicTaskBase) startBasicTask() (err error) { stderr = io.MultiWriter(stderrLog, &stderrBuf) case "all": - stderrLog := log.WithPrefix("task-stderr"). + stderrLog = log.WithPrefix("task-stderr"). WithField("level", infologger.IL_Support). WithField("partition", t.knownEnvironmentId.String()). WithField("detector", t.knownDetector). @@ -142,7 +145,6 @@ func (t *basicTaskBase) startBasicTask() (err error) { stderrIn, _ := t.taskCmd.StderrPipe() err = t.taskCmd.Start() - if err != nil { log.WithField("partition", t.knownEnvironmentId.String()). WithFields(logrus.Fields{ @@ -153,6 +155,8 @@ func (t *basicTaskBase) startBasicTask() (err error) { }). Error("failed to run basic task") + closePipeWriters(stdoutLog, stderrLog) + return err } log.WithField("partition", t.knownEnvironmentId.String()). @@ -172,6 +176,8 @@ func (t *basicTaskBase) startBasicTask() (err error) { err = taskCmd.Wait() // ^ when this unblocks, the task is done + closePipeWriters(stdoutLog, stderrLog) + pendingState := mesos.TASK_FINISHED var tciCommandStr string if t.Tci.Value != nil { @@ -243,6 +249,15 @@ func (t *basicTaskBase) startBasicTask() (err error) { return err } +func closePipeWriters(stdoutLog *io.PipeWriter, stderrLog *io.PipeWriter) { + if stdoutLog != nil { + stdoutLog.Close() + } + if stderrLog != nil { + stderrLog.Close() + } +} + func (t *basicTaskBase) ensureBasicTaskKilled() (err error) { if t.taskCmd == nil { return nil diff --git a/executor/executable/controllabletask.go b/executor/executable/controllabletask.go index e45256bdc..c8794c7f0 100644 --- a/executor/executable/controllabletask.go +++ b/executor/executable/controllabletask.go @@ -29,6 +29,7 @@ import ( "encoding/json" "errors" "io" + "os/exec" "reflect" "strings" "syscall" @@ -62,6 +63,7 @@ type ControllableTask struct { taskBase rpc *executorcmd.RpcClient pendingFinalTaskStateCh chan mesos.TaskState + taskDoneCh chan error knownPid int } @@ -71,48 +73,39 @@ type CommitResponse struct { } func (t *ControllableTask) Launch() error { - log.WithFields(logrus.Fields{ + defaultLogFields := logrus.Fields{ "taskId": t.ti.TaskID.GetValue(), "taskName": t.ti.Name, - "level": infologger.IL_Devel, "partition": t.knownEnvironmentId.String(), "detector": t.knownDetector, - }).Debug("executor.ControllableTask.Launch begin") + } + + log.WithFields(defaultLogFields). + WithField(infologger.Level, infologger.IL_Devel). + Debug("executor.ControllableTask.Launch begin") launchStartTime := time.Now() defer utils.TimeTrack(launchStartTime, "executor.ControllableTask.Launch", - log.WithFields(logrus.Fields{ - "taskId": t.ti.TaskID.GetValue(), - "taskName": t.ti.Name, - "level": infologger.IL_Devel, - "partition": t.knownEnvironmentId.String(), - "detector": t.knownDetector, - })) + log.WithFields(defaultLogFields). + WithField(infologger.Level, infologger.IL_Devel)) t.pendingFinalTaskStateCh = make(chan mesos.TaskState, 1) // we use this to receive a pending status update if the task was killed taskCmd, err := prepareTaskCmd(t.Tci) if err != nil { msg := "cannot build task command" - log.WithFields(logrus.Fields{ - "id": t.ti.TaskID.Value, - "task": t.ti.Name, - "error": err, - "partition": t.knownEnvironmentId.String(), - "detector": t.knownDetector, - }). + log.WithFields(defaultLogFields). + WithError(err). Error(msg) t.sendStatus(t.knownEnvironmentId, mesos.TASK_FAILED, msg+": "+err.Error()) return err } - log.WithField("payload", string(t.ti.GetData()[:])). - WithField("task", t.ti.Name). - WithField("level", infologger.IL_Devel). - WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). + log.WithFields(defaultLogFields). + WithField("payload", string(t.ti.GetData()[:])). + WithField(infologger.Level, infologger.IL_Devel). Debug("starting task asynchronously") // We fork out into a goroutine for the actual process management. @@ -120,523 +113,458 @@ func (t *ControllableTask) Launch() error { // Anything in the following goroutine must not touch *internalState, except // via channels. go func() { - truncatedCmd := executorutil.TruncateCommandBeforeTheLastPipe(t.Tci.GetValue(), 500) - log.WithFields(logrus.Fields{ - "cmd": truncatedCmd, - "taskId": t.ti.TaskID.GetValue(), - "taskName": t.ti.Name, - "level": infologger.IL_Devel, - "partition": t.knownEnvironmentId.String(), - "detector": t.knownDetector, - }).Debug("executor.ControllableTask.Launch.async begin") - - // Set up pipes for controlled process - var errStdout, errStderr error - stdoutIn, _ := taskCmd.StdoutPipe() - stderrIn, _ := taskCmd.StderrPipe() - - err = taskCmd.Start() - if err != nil { - log.WithFields(logrus.Fields{ - "id": t.ti.TaskID.Value, - "task": t.ti.Name, - "error": err.Error(), - "command": truncatedCmd, - "partition": t.knownEnvironmentId.String(), - "detector": t.knownDetector, - }). - Error("failed to run task") - - t.sendStatus(t.knownEnvironmentId, mesos.TASK_FAILED, err.Error()) - _ = t.doTermIntKill(-taskCmd.Process.Pid) - return - } - log.WithField("id", t.ti.TaskID.Value). - WithField("task", t.ti.Name). - WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - Debug("task launched") - - utils.TimeTrack(launchStartTime, - "executor.ControllableTask.Launch.async: Launch begin to taskCmd.Start() complete", - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithFields(logrus.Fields{ - "cmd": truncatedCmd, - "taskId": t.ti.TaskID.GetValue(), - "taskName": t.ti.Name, - "level": infologger.IL_Devel, - })) - - if t.Tci.Stdout == nil { - none := "none" - t.Tci.Stdout = &none + t.doLaunchTask(taskCmd, launchStartTime) + }() + + log.WithFields(defaultLogFields). + WithField(infologger.Level, infologger.IL_Devel). + Debug("gRPC client starting, handler forked: executor.ControllableTask.Launch end") + return nil +} + +func (t *ControllableTask) doLaunchTask(taskCmd *exec.Cmd, launchStartTime time.Time) { + defaultLogFields := logrus.Fields{ + "taskId": t.ti.TaskID.GetValue(), + "taskName": t.ti.Name, + "partition": t.knownEnvironmentId.String(), + "detector": t.knownDetector, + } + truncatedCmd := executorutil.TruncateCommandBeforeTheLastPipe(t.Tci.GetValue(), 500) + + log.WithFields(defaultLogFields). + WithField("command", truncatedCmd). + WithField(infologger.Level, infologger.IL_Devel). + Debug("executor.ControllableTask.Launch.async begin") + + // Set up pipes for controlled process. They have to be retrieved before starting the task. + stdoutIn, _ := taskCmd.StdoutPipe() + stderrIn, _ := taskCmd.StderrPipe() + + err := taskCmd.Start() + if err != nil { + log.WithFields(defaultLogFields). + WithField("command", truncatedCmd). + WithError(err). + Error("failed to run task") + + t.sendStatus(t.knownEnvironmentId, mesos.TASK_FAILED, err.Error()) + // no need to close IO pipes, Cmd.Start does it on failure + return + } + log.WithFields(defaultLogFields).Debug("task launched") + + utils.TimeTrack(launchStartTime, + "executor.ControllableTask.Launch.async: Launch begin to taskCmd.Start() complete", + log.WithFields(defaultLogFields). + WithField(infologger.Level, infologger.IL_Devel)) + + t.initTaskStdLogging(stdoutIn, stderrIn) + + // We start to Wait() for the result already, so we have access to ProcessState on an early failure + t.taskDoneCh = make(chan error, 1) + go func() { + t.taskDoneCh <- taskCmd.Wait() + }() + + log.WithFields(defaultLogFields). + WithFields(logrus.Fields{ + "controlPort": t.Tci.ControlPort, + "controlMode": t.Tci.ControlMode.String(), + "path": taskCmd.Path, + "argv": "[ " + strings.Join(taskCmd.Args, ", ") + " ]", + "argc": len(taskCmd.Args), + infologger.Level: infologger.IL_Devel, + }).Debug("starting gRPC client") + + controlTransport := executorcmd.ProtobufTransport + for _, v := range taskCmd.Args { + if strings.Contains(v, "-P OCClite") { + controlTransport = executorcmd.JsonTransport + break } - if t.Tci.Stderr == nil { - none := "none" - t.Tci.Stderr = &none + } + + rpcDialStartTime := time.Now() + t.rpc = executorcmd.NewClient( + t.Tci.ControlPort, + t.Tci.ControlMode, + controlTransport, + log.WithPrefix("executorcmd"). + WithFields(defaultLogFields), + ) + if t.rpc == nil { + // Check if the task is still running by checking ProcessState + if taskCmd.ProcessState != nil { + err = errors.New("AliECS executor could not connect to task, likely crashed on startup") + } else { + err = errors.New("AliECS executor could not connect to task, likely took too long to start") } + taskClassName, _ := utils.ExtractTaskClassName(t.ti.Name) + log.WithFields(logrus.Fields{ + "task": utils.TrimJitPrefix(taskClassName), + "partition": t.knownEnvironmentId.String(), + "detector": t.knownDetector, + infologger.Level: infologger.IL_Ops, + }).Error(err.Error()) + + t.sendStatus(t.knownEnvironmentId, mesos.TASK_FAILED, err.Error()) + + t.cleanupFailedTask(taskCmd) + return + } + t.rpc.TaskCmd = taskCmd + + utils.TimeTrack(launchStartTime, + "executor.ControllableTask.Launch.async: Launch begin to gRPC client dial success", + log.WithFields(defaultLogFields). + WithField(infologger.Level, infologger.IL_Devel)) + + utils.TimeTrack(rpcDialStartTime, + "executor.ControllableTask.Launch.async: gRPC client dial begin to gRPC client dial success", + log.WithFields(defaultLogFields). + WithField(infologger.Level, infologger.IL_Devel)) + + err = t.pollTaskForStandbyState() + if err != nil { + log.WithFields(defaultLogFields). + WithField(infologger.Level, infologger.IL_Support). + WithError(err). + Error("failed to poll task for standby state") + + t.sendStatus(t.knownEnvironmentId, mesos.TASK_FAILED, err.Error()) + + t.cleanupFailedTask(taskCmd) + return + } + + utils.TimeTrack(launchStartTime, + "executor.ControllableTask.Launch.async: Launch begin to gRPC state polling done", + log.WithFields(defaultLogFields). + WithField(infologger.Level, infologger.IL_Devel)) + + // Set up event stream from task + esc, err := t.rpc.EventStream(context.TODO(), &pb.EventStreamRequest{}, grpc.EmptyCallOption{}) + if err != nil { + log.WithFields(defaultLogFields). + WithError(err). + Error("cannot set up event stream from task") + t.sendStatus(t.knownEnvironmentId, mesos.TASK_FAILED, err.Error()) + t.cleanupFailedTask(taskCmd) + return + } + + // send RUNNING + t.sendStatus(t.knownEnvironmentId, mesos.TASK_RUNNING, "") + taskMessage := event.NewAnnounceTaskPIDEvent(t.ti.TaskID.GetValue(), int32(t.knownPid)) + taskMessage.SetLabels(map[string]string{"detector": t.knownDetector, "environmentId": t.knownEnvironmentId.String()}) + + jsonEvent, err := json.Marshal(taskMessage) + if err != nil { + log.WithFields(defaultLogFields). + WithError(err). + Warning("error marshaling message") + } else { + t.sendMessage(jsonEvent) + log.WithFields(defaultLogFields). + WithField("command", truncatedCmd). + WithField(infologger.Level, infologger.IL_Devel). + Debug("executor.ControllableTask.Launch.async: TASK_RUNNING sent back to core") + } + + // Process events from task in yet another goroutine + go func() { + t.processEventsFromTask(esc) + }() + + err = <-t.taskDoneCh + // ^ when this unblocks, the task is done + log.WithFields(defaultLogFields). + WithField("command", truncatedCmd). + WithField(infologger.Level, infologger.IL_Devel). + Debug("task done (taskCmd.Wait unblocks), preparing final update") + + pendingState := mesos.TASK_FINISHED + if err != nil { + taskClassName, _ := utils.ExtractTaskClassName(t.ti.Name) + log.WithFields(defaultLogFields). + WithField(infologger.Level, infologger.IL_Ops). + Errorf("task '%s' terminated with error: %s", utils.TrimJitPrefix(taskClassName), err.Error()) + log.WithFields(defaultLogFields). + WithField("command", truncatedCmd). + WithField(infologger.Level, infologger.IL_Devel). + WithError(err). + Error("task terminated with error (details):") + pendingState = mesos.TASK_FAILED + } + + select { + case pending := <-t.pendingFinalTaskStateCh: + pendingState = pending + default: + } + + if t.rpc != nil { + _ = t.rpc.Close() // NOTE: might return non-nil error, but we don't care much + log.WithFields(defaultLogFields). + Debug("rpc client closed") + t.rpc = nil + log.WithFields(defaultLogFields). + Debug("rpc client removed") + } + + t.sendStatus(t.knownEnvironmentId, pendingState, "") + return +} + +func (t *ControllableTask) cleanupFailedTask(taskCmd *exec.Cmd) { + + defaultLogFields := logrus.Fields{ + "taskId": t.ti.TaskID.GetValue(), + "taskName": t.ti.Name, + "partition": t.knownEnvironmentId.String(), + "detector": t.knownDetector, + } + + if taskCmd.Process == nil { + // task never started or was already terminated + return + } + + if t.rpc != nil { + _ = t.rpc.Close() + t.rpc = nil + } + + pid := t.knownPid + if pid == 0 { + // The pid was never known through a successful `GetState` in the lifetime + // of this process, so we must rely on the PGID of the containing shell + pid = -taskCmd.Process.Pid + } + + _ = t.doTermIntKill(-taskCmd.Process.Pid) + + // Wait for task to finish and report the error + err := <-t.taskDoneCh + if err != nil { + log.WithFields(defaultLogFields). + WithField(infologger.Level, infologger.IL_Support). + WithError(err). + Warning("task terminated and exited with error") + } else { + log.WithFields(defaultLogFields). + WithField(infologger.Level, infologger.IL_Support). + Debug("task terminated") + } +} + +func (t *ControllableTask) initTaskStdLogging(stdoutIn io.ReadCloser, stderrIn io.ReadCloser) { + defaultLogFields := logrus.Fields{ + "taskId": t.ti.TaskID.GetValue(), + "taskName": t.ti.Name, + "partition": t.knownEnvironmentId.String(), + "detector": t.knownDetector, + } + + if t.Tci.Stdout == nil { + none := "none" + t.Tci.Stdout = &none + } + if t.Tci.Stderr == nil { + none := "none" + t.Tci.Stderr = &none + } + + go func() { + var errStdout error switch *t.Tci.Stdout { case "stdout": - go func() { - entry := log.WithPrefix("task-stdout"). - WithField("level", infologger.IL_Support). - WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithField("task", t.ti.Name). - WithField("nohooks", true) - writer := &logger.SafeLogrusWriter{ - Entry: entry, - PrintFunc: entry.Debug, - } - _, errStdout = io.Copy(writer, stdoutIn) - writer.Flush() - }() + entry := log.WithPrefix("task-stdout"). + WithFields(defaultLogFields). + WithField(infologger.Level, infologger.IL_Support). + WithField("nohooks", true) + writer := &logger.SafeLogrusWriter{ + Entry: entry, + PrintFunc: entry.Debug, + } + _, errStdout = io.Copy(writer, stdoutIn) + writer.Flush() case "all": - go func() { - entry := log.WithPrefix("task-stdout"). - WithField("level", infologger.IL_Support). - WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithField("task", t.ti.Name) - writer := &logger.SafeLogrusWriter{ - Entry: entry, - PrintFunc: entry.Debug, - } - _, errStdout = io.Copy(writer, stdoutIn) - writer.Flush() - }() + entry := log.WithPrefix("task-stdout"). + WithFields(defaultLogFields). + WithField(infologger.Level, infologger.IL_Support) + writer := &logger.SafeLogrusWriter{ + Entry: entry, + PrintFunc: entry.Debug, + } + _, errStdout = io.Copy(writer, stdoutIn) + writer.Flush() default: - go func() { - _, errStdout = io.Copy(io.Discard, stdoutIn) - }() + _, errStdout = io.Copy(io.Discard, stdoutIn) } + if errStdout != nil { + log.WithFields(defaultLogFields). + WithError(errStdout). + WithField(infologger.Level, infologger.IL_Devel). + Warning("failed to capture stdout of task") + } + }() + go func() { + var errStderr error switch *t.Tci.Stderr { case "stdout": - go func() { - entry := log.WithPrefix("task-stderr"). - WithField("level", infologger.IL_Support). - WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithField("task", t.ti.Name). - WithField("nohooks", true) - writer := &logger.SafeLogrusWriter{ - Entry: entry, - PrintFunc: entry.Warn, - } - _, errStderr = io.Copy(writer, stderrIn) - writer.Flush() - }() + entry := log.WithPrefix("task-stderr"). + WithFields(defaultLogFields). + WithField(infologger.Level, infologger.IL_Support). + WithField("nohooks", true) + writer := &logger.SafeLogrusWriter{ + Entry: entry, + PrintFunc: entry.Warn, + } + _, errStderr = io.Copy(writer, stderrIn) + writer.Flush() case "all": - go func() { - entry := log.WithPrefix("task-stderr"). - WithField("level", infologger.IL_Support). - WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithField("task", t.ti.Name) - writer := &logger.SafeLogrusWriter{ - Entry: entry, - PrintFunc: entry.Warn, - } - _, errStderr = io.Copy(writer, stderrIn) - writer.Flush() - }() + entry := log.WithPrefix("task-stderr"). + WithFields(defaultLogFields). + WithField(infologger.Level, infologger.IL_Support) + writer := &logger.SafeLogrusWriter{ + Entry: entry, + PrintFunc: entry.Warn, + } + _, errStderr = io.Copy(writer, stderrIn) + writer.Flush() default: - go func() { - _, errStderr = io.Copy(io.Discard, stderrIn) - }() + _, errStderr = io.Copy(io.Discard, stderrIn) + } + if errStderr != nil { + log.WithFields(defaultLogFields). + WithError(errStderr). + WithField(infologger.Level, infologger.IL_Devel). + Warning("failed to capture stderr of task") } + }() +} - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithFields(logrus.Fields{ - "controlPort": t.Tci.ControlPort, - "controlMode": t.Tci.ControlMode.String(), - "task": t.ti.Name, - "id": t.ti.TaskID.Value, - "path": taskCmd.Path, - "argv": "[ " + strings.Join(taskCmd.Args, ", ") + " ]", - "argc": len(taskCmd.Args), - "level": infologger.IL_Devel, - }). - Debug("starting gRPC client") - - controlTransport := executorcmd.ProtobufTransport - for _, v := range taskCmd.Args { - if strings.Contains(v, "-P OCClite") { - controlTransport = executorcmd.JsonTransport - break - } +func (t *ControllableTask) pollTaskForStandbyState() error { + defaultLogFields := logrus.Fields{ + "taskId": t.ti.TaskID.GetValue(), + "taskName": t.ti.Name, + "partition": t.knownEnvironmentId.String(), + "detector": t.knownDetector, + } + statePollingStartTime := time.Now() + elapsed := 0 * time.Second + for { + log.WithFields(defaultLogFields). + WithField("elapsed", elapsed.String()). + WithField(infologger.Level, infologger.IL_Devel). + Debug("polling task for STANDBY state reached") + + response, err := t.rpc.GetState(context.TODO(), &pb.GetStateRequest{}, grpc.EmptyCallOption{}) + reachedState := "UNKNOWN" + if err != nil { + log.WithError(err). + WithFields(defaultLogFields). + WithField("state", response.GetState()). + Info("cannot query task status") + } else { + log.WithFields(defaultLogFields). + WithField("state", response.GetState()). + WithField(infologger.Level, infologger.IL_Devel). + Debug("task status queried") + t.knownPid = int(response.GetPid()) + // NOTE: we acquire the transitioner-dependent STANDBY equivalent state + reachedState = t.rpc.FromDeviceState(response.GetState()) } - rpcDialStartTime := time.Now() + if reachedState == "STANDBY" { + log.WithFields(defaultLogFields). + WithField(infologger.Level, infologger.IL_Devel). + Debug("task running and ready for control input") + break + } else if reachedState == "DONE" || reachedState == "ERROR" { + // something went wrong, the device moved to DONE or ERROR on startup + return errors.New("task reached wrong state on startup") + } else if elapsed >= startupTimeout { + return errors.New("timeout while trying to poll task") + } else { + log.WithFields(defaultLogFields). + Debugf("task not ready yet, waiting %s", startupPollingInterval.String()) + time.Sleep(startupPollingInterval) + elapsed += startupPollingInterval + } + } - t.rpc = executorcmd.NewClient( - t.Tci.ControlPort, - t.Tci.ControlMode, - controlTransport, - log.WithPrefix("executorcmd"). - WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithFields(logrus.Fields{ - "id": t.ti.TaskID.Value, - "task": t.ti.Name, - "command": truncatedCmd, - }, - ), - ) - if t.rpc == nil { - err = errors.New("rpc client is nil") - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithFields(logrus.Fields{ - "id": t.ti.TaskID.Value, - "task": t.ti.Name, - "error": err.Error(), - "command": truncatedCmd, - }). - WithField("level", infologger.IL_Devel). - Error("could not start gRPC client") + utils.TimeTrack(statePollingStartTime, + "executor.ControllableTask.Launch.async: gRPC state polling begin to gRPC state polling done", + log.WithFields(defaultLogFields). + WithField(infologger.Level, infologger.IL_Devel)) + return nil +} - t.sendStatus(t.knownEnvironmentId, mesos.TASK_FAILED, err.Error()) - _ = t.doTermIntKill(-taskCmd.Process.Pid) - return - } - t.rpc.TaskCmd = taskCmd +func (t *ControllableTask) processEventsFromTask(esc pb.Occ_EventStreamClient) { + defaultLogFields := logrus.Fields{ + "taskId": t.ti.TaskID.GetValue(), + "taskName": t.ti.Name, + "partition": t.knownEnvironmentId.String(), + "detector": t.knownDetector, + } + deo := event.DeviceEventOrigin{ + AgentId: t.ti.AgentID, + ExecutorId: t.ti.GetExecutor().ExecutorID, + TaskId: t.ti.TaskID, + } - utils.TimeTrack(launchStartTime, - "executor.ControllableTask.Launch.async: Launch begin to gRPC client dial success", - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithFields(logrus.Fields{ - "command": truncatedCmd, - "taskId": t.ti.TaskID.GetValue(), - "taskName": t.ti.Name, - "level": infologger.IL_Devel, - })) - - utils.TimeTrack(rpcDialStartTime, - "executor.ControllableTask.Launch.async: gRPC client dial begin to gRPC client dial success", - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithFields(logrus.Fields{ - "command": truncatedCmd, - "taskId": t.ti.TaskID.GetValue(), - "taskName": t.ti.Name, - "level": infologger.IL_Devel, - })) - - statePollingStartTime := time.Now() - elapsed := 0 * time.Second - for { - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithFields(logrus.Fields{ - "id": t.ti.TaskID.Value, - "task": t.ti.Name, - "command": truncatedCmd, - "elapsed": elapsed.String(), - "level": infologger.IL_Devel, - }). - Debug("polling task for IDLE state reached") - - response, err := t.rpc.GetState(context.TODO(), &pb.GetStateRequest{}, grpc.EmptyCallOption{}) - if err != nil { - log.WithError(err). - WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithFields(logrus.Fields{ - "state": response.GetState(), - "task": t.ti.Name, - "command": truncatedCmd, - }). - Info("cannot query task status") - } else { - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithFields(logrus.Fields{ - "state": response.GetState(), - "task": t.ti.Name, - "command": truncatedCmd, - "level": infologger.IL_Devel, - }). - Debug("task status queried") - t.knownPid = int(response.GetPid()) - } - // NOTE: we acquire the transitioner-dependent STANDBY equivalent state - reachedState := t.rpc.FromDeviceState(response.GetState()) - - if reachedState == "STANDBY" && err == nil { - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithField("id", t.ti.TaskID.Value). - WithField("task", t.ti.Name). - WithField("command", truncatedCmd). - WithField("level", infologger.IL_Devel). - Debug("task running and ready for control input") - break - } else if reachedState == "DONE" || reachedState == "ERROR" { - // something went wrong, the device moved to DONE or ERROR on startup - pid := t.knownPid - if pid == 0 { - // The pid was never known through a successful `GetState` in the lifetime - // of this process, so we must rely on the PGID of the containing shell - pid = -t.rpc.TaskCmd.Process.Pid - } - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithField("taskId", t.ti.Name). - Debug("sending SIGKILL (9) to task") - _ = syscall.Kill(pid, syscall.SIGKILL) - _ = stdoutIn.Close() - _ = stderrIn.Close() - - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithField("task", t.ti.Name).Debug("task killed") - t.sendStatus(t.knownEnvironmentId, mesos.TASK_FAILED, "task reached wrong state on startup") - return - } else if elapsed >= startupTimeout { - err = errors.New("timeout while waiting for task startup") - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithField("task", t.ti.Name).Error(err.Error()) - t.sendStatus(t.knownEnvironmentId, mesos.TASK_FAILED, err.Error()) - _ = t.rpc.Close() - t.rpc = nil - - _ = stdoutIn.Close() - _ = stderrIn.Close() - - return - } else { - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithField("task", t.ti.Name). - WithField("command", truncatedCmd). - Debugf("task not ready yet, waiting %s", startupPollingInterval.String()) - time.Sleep(startupPollingInterval) - elapsed += startupPollingInterval - } + for { + if t.rpc == nil { + log.WithFields(defaultLogFields). + Debug("event stream done") + break } - - utils.TimeTrack(launchStartTime, - "executor.ControllableTask.Launch.async: Launch begin to gRPC state polling done", - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithFields(logrus.Fields{ - "command": truncatedCmd, - "taskId": t.ti.TaskID.GetValue(), - "taskName": t.ti.Name, - "level": infologger.IL_Devel, - })) - - utils.TimeTrack(statePollingStartTime, - "executor.ControllableTask.Launch.async: gRPC state polling begin to gRPC state polling done", - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithFields(logrus.Fields{ - "command": truncatedCmd, - "taskId": t.ti.TaskID.GetValue(), - "taskName": t.ti.Name, - "level": infologger.IL_Devel, - })) - - // Set up event stream from task - esc, err := t.rpc.EventStream(context.TODO(), &pb.EventStreamRequest{}, grpc.EmptyCallOption{}) - if err != nil { - log.WithField("task", t.ti.Name). + esr, err := esc.Recv() + if err == io.EOF { + log.WithFields(defaultLogFields). WithError(err). - WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - Error("cannot set up event stream from task") - t.sendStatus(t.knownEnvironmentId, mesos.TASK_FAILED, err.Error()) - _ = t.rpc.Close() - t.rpc = nil - return + Debug("event stream EOF") + break } - - // send RUNNING - t.sendStatus(t.knownEnvironmentId, mesos.TASK_RUNNING, "") - taskMessage := event.NewAnnounceTaskPIDEvent(t.ti.TaskID.GetValue(), int32(t.knownPid)) - taskMessage.SetLabels(map[string]string{"detector": t.knownDetector, "environmentId": t.knownEnvironmentId.String()}) - - jsonEvent, err := json.Marshal(taskMessage) if err != nil { - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithField("taskId", t.ti.TaskID.GetValue()). - WithField("taskName", t.ti.Name). + log.WithFields(defaultLogFields). + WithField("errorType", reflect.TypeOf(err)). + WithField(infologger.Level, infologger.IL_Devel). WithError(err). - Warning("error marshaling message") - } else { - t.sendMessage(jsonEvent) - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithFields(logrus.Fields{ - "command": truncatedCmd, - "taskId": t.ti.TaskID.GetValue(), - "taskName": t.ti.Name, - "level": infologger.IL_Devel, - }).Debug("executor.ControllableTask.Launch.async: TASK_RUNNING sent back to core") - } - - // Process events from task in yet another goroutine - go func() { - deo := event.DeviceEventOrigin{ - AgentId: t.ti.AgentID, - ExecutorId: t.ti.GetExecutor().ExecutorID, - TaskId: t.ti.TaskID, - } - for { - if t.rpc == nil { - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithField("taskId", deo.TaskId.GetValue()). - WithField("taskName", t.ti.Name). - WithError(err). - Debug("event stream done") - break - } - esr, err := esc.Recv() - if err == io.EOF { - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithField("taskId", deo.TaskId.GetValue()). - WithField("taskName", t.ti.Name). - WithError(err). - Debug("event stream EOF") - break - } - if err != nil { - log.WithError(err). - WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithField("errorType", reflect.TypeOf(err)). - WithField("level", infologger.IL_Devel). - WithField("taskId", deo.TaskId.GetValue()). - WithField("taskName", t.ti.Name). - Warning("error receiving event") - if status.Code(err) == codes.Unavailable { - break - } - continue - } - ev := esr.GetEvent() - - deviceEvent := event.NewDeviceEvent(deo, ev.GetType()) - if deviceEvent == nil { - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithField("taskId", deo.TaskId.GetValue()). - WithField("taskName", t.ti.Name). - Debug("nil DeviceEvent received (NULL_DEVICE_EVENT) - closing stream") - break - } else { - taskId := deo.TaskId.Value - - if deviceEvent.GetType() == pb.DeviceEventType_END_OF_STREAM { - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithField("taskId", taskId). - WithField("taskName", t.ti.Name). - WithField("taskPid", t.knownPid). - Debug("END_OF_STREAM DeviceEvent received - notifying environment") - } else if ev.GetType() == pb.DeviceEventType_TASK_INTERNAL_ERROR { - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithField("taskId", taskId). - WithField("taskName", t.ti.Name). - WithField("taskPid", t.knownPid). - WithField("level", infologger.IL_Support). - Warningf("task transitioned to ERROR on its own - notifying environment") - } - } - deviceEvent.SetLabels(map[string]string{"detector": t.knownDetector, "environmentId": t.knownEnvironmentId.String()}) - - t.sendDeviceEvent(t.knownEnvironmentId, deviceEvent) + Warning("error receiving event") + if status.Code(err) == codes.Unavailable { + break } - }() - - err = taskCmd.Wait() - // ^ when this unblocks, the task is done - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithFields(logrus.Fields{ - "id": t.ti.TaskID.Value, - "task": t.ti.Name, - "command": truncatedCmd, - "level": infologger.IL_Devel, - }).Debug("task done (taskCmd.Wait unblocks), preparing final update") - - pendingState := mesos.TASK_FINISHED - if err != nil { - taskClassName, _ := utils.ExtractTaskClassName(t.ti.Name) - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithField("level", infologger.IL_Ops). - Errorf("task '%s' terminated with error: %s", utils.TrimJitPrefix(taskClassName), err.Error()) - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithFields(logrus.Fields{ - "id": t.ti.TaskID.Value, - "task": t.ti.Name, - "command": truncatedCmd, - "error": err.Error(), - "level": infologger.IL_Devel, - }). - Error("task terminated with error (details):") - pendingState = mesos.TASK_FAILED - } - - select { - case pending := <-t.pendingFinalTaskStateCh: - pendingState = pending - default: - } - - if t.rpc != nil { - _ = t.rpc.Close() // NOTE: might return non-nil error, but we don't care much - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithField("taskId", t.ti.TaskID.GetValue()). - WithField("taskName", t.ti.Name). - Debug("rpc client closed") - t.rpc = nil - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithField("taskId", t.ti.TaskID.GetValue()). - WithField("taskName", t.ti.Name). - Debug("rpc client removed") + // fixme: we also get codes.Canceled sometimes, it's probably OK and we should not complain + continue } + ev := esr.GetEvent() - if errStdout != nil || errStderr != nil { - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithFields(logrus.Fields{ - "errStderr": errStderr, - "errStdout": errStdout, - "id": t.ti.TaskID.Value, - "task": t.ti.Name, - "command": truncatedCmd, - "level": infologger.IL_Devel, - }). - Warning("failed to capture stdout or stderr of task") + deviceEvent := event.NewDeviceEvent(deo, ev.GetType()) + if deviceEvent == nil { + log.WithFields(defaultLogFields). + Debug("nil DeviceEvent received (NULL_DEVICE_EVENT) - closing stream") + break + } else { + if deviceEvent.GetType() == pb.DeviceEventType_END_OF_STREAM { + log.WithFields(defaultLogFields). + WithField("taskPid", t.knownPid). + Debug("END_OF_STREAM DeviceEvent received - notifying environment") + } else if ev.GetType() == pb.DeviceEventType_TASK_INTERNAL_ERROR { + log.WithFields(defaultLogFields). + WithField("taskPid", t.knownPid). + WithField(infologger.Level, infologger.IL_Support). + Warningf("task transitioned to ERROR on its own - notifying environment") + } } + deviceEvent.SetLabels(map[string]string{"detector": t.knownDetector, "environmentId": t.knownEnvironmentId.String()}) - t.sendStatus(t.knownEnvironmentId, pendingState, "") - }() - - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithFields(logrus.Fields{ - "task": t.ti.Name, - "level": infologger.IL_Devel, - }). - Debug("gRPC client starting, handler forked: executor.ControllableTask.Launch end") - return nil + t.sendDeviceEvent(t.knownEnvironmentId, deviceEvent) + } } func (t *ControllableTask) UnmarshalTransition(data []byte) (cmd *executorcmd.ExecutorCommand_Transition, err error) { @@ -662,6 +590,13 @@ func (t *ControllableTask) Transition(cmd *executorcmd.ExecutorCommand_Transitio } func (t *ControllableTask) Kill() error { + defaultLogFields := logrus.Fields{ + "taskId": t.ti.TaskID.GetValue(), + "taskName": t.ti.Name, + "partition": t.knownEnvironmentId.String(), + "detector": t.knownDetector, + } + var ( pid = 0 reachedState = "UNKNOWN" // FIXME: should be LAUNCHING or similar @@ -670,11 +605,9 @@ func (t *ControllableTask) Kill() error { defer cancel() response, err := t.rpc.GetState(cxt, &pb.GetStateRequest{}, grpc.EmptyCallOption{}) if err == nil { // we successfully got the state from the task - log.WithField("nativeState", response.GetState()). - WithField("taskId", t.ti.GetTaskID()). - WithField("level", infologger.IL_Devel). - WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). + log.WithFields(defaultLogFields). + WithField("nativeState", response.GetState()). + WithField(infologger.Level, infologger.IL_Devel). Debug("task status queried for upcoming soft kill") // NOTE: we acquire the transitioner-dependent STANDBY equivalent state @@ -717,19 +650,18 @@ func (t *ControllableTask) Kill() error { for reachedState != "DONE" { cmd := nextTransition(reachedState) - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). + log.WithFields(defaultLogFields). WithFields(logrus.Fields{ - "evt": cmd.Event, - "src": cmd.Source, - "dst": cmd.Destination, - "targetList": cmd.TargetList, - "level": infologger.IL_Devel, + "evt": cmd.Event, + "src": cmd.Source, + "dst": cmd.Destination, + "targetList": cmd.TargetList, + infologger.Level: infologger.IL_Devel, }). Debug("state DONE not reached, about to commit transition") - // Call cmd.Commit() asynchronous - commitDone := make(chan *CommitResponse) + // Call cmd.Commit() asynchronous with buffered channel so it doesn't get stuck in a case of timeout + commitDone := make(chan *CommitResponse, 1) go func() { var cr CommitResponse cr.newState, cr.transitionError = cmd.Commit() @@ -741,10 +673,8 @@ func (t *ControllableTask) Kill() error { select { case commitResponse = <-commitDone: case <-time.After(KILL_TRANSITION_TIMEOUT): - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithField("task", t.ti.TaskID.Value). - WithField("level", infologger.IL_Devel). + log.WithFields(defaultLogFields). + WithField(infologger.Level, infologger.IL_Devel). Warn("teardown transition sequence timed out") } // timeout we should break @@ -752,29 +682,23 @@ func (t *ControllableTask) Kill() error { break } - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). + log.WithFields(defaultLogFields). WithField("newState", commitResponse.newState). WithError(commitResponse.transitionError). - WithField("task", t.ti.TaskID.Value). - WithField("level", infologger.IL_Devel). + WithField(infologger.Level, infologger.IL_Devel). Debug("transition committed") if commitResponse.transitionError != nil || len(cmd.Event) == 0 { - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). + log.WithFields(defaultLogFields). WithError(commitResponse.transitionError). - WithField("task", t.ti.TaskID.Value). - WithField("level", infologger.IL_Devel). + WithField(infologger.Level, infologger.IL_Devel). Warn("teardown transition sequence error") break } reachedState = commitResponse.newState } - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithField("task", t.ti.TaskID.Value). - WithField("level", infologger.IL_Devel). + log.WithFields(defaultLogFields). + WithField(infologger.Level, infologger.IL_Devel). Debug("teardown transition sequence done") pid = int(response.GetPid()) if pid == 0 { @@ -785,10 +709,8 @@ func (t *ControllableTask) Kill() error { // If GetState didn't succeed during this Kill code path, but might still have // at some earlier point during the lifetime of this task. // Either way, we might or might not have the true PID. - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). + log.WithFields(defaultLogFields). WithError(err). - WithField("taskId", t.ti.GetTaskID()). Warn("cannot query task status for graceful process termination") pid = t.knownPid if pid == 0 { @@ -802,9 +724,8 @@ func (t *ControllableTask) Kill() error { // terminate the shell that is wrapping the command, so we avoid using // negative PID is all other cases in order to allow FairMQ cleanup to // run. - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithError(err).WithField("taskId", t.ti.GetTaskID()). + log.WithFields(defaultLogFields). + WithError(err). Warn("task PID not known from task, using containing shell PGID") } } @@ -813,16 +734,12 @@ func (t *ControllableTask) Kill() error { t.rpc = nil if reachedState == "DONE" { - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithField("taskId", t.ti.TaskID.Value). + log.WithFields(defaultLogFields). Debugf("task reached DONE, will wait %.1fs before terminating it", DONE_TIMEOUT.Seconds()) t.pendingFinalTaskStateCh <- mesos.TASK_FINISHED time.Sleep(DONE_TIMEOUT) } else { // something went wrong - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithField("taskId", t.ti.TaskID.Value). + log.WithFields(defaultLogFields). Debug("task died already or will be killed soon") t.pendingFinalTaskStateCh <- mesos.TASK_KILLED } @@ -830,25 +747,26 @@ func (t *ControllableTask) Kill() error { if pidExists(pid) { return t.doTermIntKill(pid) } else { - log.WithField("taskId", t.ti.GetTaskID()). - WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). + log.WithFields(defaultLogFields). Debugf("task terminated on its own") return nil } } func (t *ControllableTask) doKill9(pid int) error { - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithField("taskId", t.ti.GetTaskID()). + defaultLogFields := logrus.Fields{ + "taskId": t.ti.TaskID.GetValue(), + "taskName": t.ti.Name, + "partition": t.knownEnvironmentId.String(), + "detector": t.knownDetector, + } + + log.WithFields(defaultLogFields). Debug("sending SIGKILL (9) to task") killErr := syscall.Kill(pid, syscall.SIGKILL) if killErr != nil { - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). + log.WithFields(defaultLogFields). WithError(killErr). - WithField("taskId", t.ti.GetTaskID()). Warning("task SIGKILL failed") } @@ -856,19 +774,21 @@ func (t *ControllableTask) doKill9(pid int) error { } func (t *ControllableTask) doTermIntKill(pid int) error { + defaultLogFields := logrus.Fields{ + "taskId": t.ti.TaskID.GetValue(), + "taskName": t.ti.Name, + "partition": t.knownEnvironmentId.String(), + "detector": t.knownDetector, + } killErrCh := make(chan error) go func() { - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithField("taskId", t.ti.GetTaskID()). + log.WithFields(defaultLogFields). Debug("sending SIGTERM (15) to task") err := syscall.Kill(pid, syscall.SIGTERM) if err != nil { - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). + log.WithFields(defaultLogFields). WithError(err). - WithField("taskId", t.ti.GetTaskID()). Warning("task SIGTERM failed") } killErrCh <- err @@ -883,16 +803,12 @@ func (t *ControllableTask) doTermIntKill(pid int) error { if pidExists(pid) { // SIGINT for the "Waiting for graceful device shutdown. // Hit Ctrl-C again to abort immediately" message. - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). - WithField("taskId", t.ti.GetTaskID()). + log.WithFields(defaultLogFields). Debug("sending SIGINT (2) to task") killErr = syscall.Kill(pid, syscall.SIGINT) if killErr != nil { - log.WithField("partition", t.knownEnvironmentId.String()). - WithField("detector", t.knownDetector). + log.WithFields(defaultLogFields). WithError(killErr). - WithField("taskId", t.ti.GetTaskID()). Warning("task SIGINT failed") } time.Sleep(SIGINT_TIMEOUT) diff --git a/executor/executorcmd/client.go b/executor/executorcmd/client.go index 738a7e521..bc9a2e7a8 100644 --- a/executor/executorcmd/client.go +++ b/executor/executorcmd/client.go @@ -80,12 +80,8 @@ func NewClient( log.WithField("error", err.Error()). WithField("endpoint", endpoint). WithField("transport", controlTransportS). - WithField("level", infologger.IL_Trace). + WithField("level", infologger.IL_Devel). Error("gRPC client can't dial") - log.WithField("error", err.Error()). - WithField("endpoint", endpoint). - WithField("level", infologger.IL_Ops). - Error("AliECS executor could not connect to task, possible crash on startup") cancel() if conn != nil { diff --git a/go.mod b/go.mod index 1fd24d1c8..90a854904 100644 --- a/go.mod +++ b/go.mod @@ -69,7 +69,7 @@ require ( require ( dario.cat/mergo v1.0.1 - github.com/expr-lang/expr v1.17.0 + github.com/expr-lang/expr v1.17.7 github.com/flosch/pongo2/v6 v6.0.0 github.com/gogo/protobuf v1.3.2 github.com/hashicorp/go-multierror v1.1.1 diff --git a/go.sum b/go.sum index a23533be8..c589d794f 100644 --- a/go.sum +++ b/go.sum @@ -61,8 +61,8 @@ github.com/emirpasic/gods v1.18.1 h1:FXtiHYKDGKCW2KzwZKx0iC0PQmdlorYgdFG9jPXJ1Bc github.com/emirpasic/gods v1.18.1/go.mod h1:8tpGGwCnJ5H4r6BWwaV6OrWmMoPhUl5jm/FMNAnJvWQ= github.com/envoyproxy/protoc-gen-validate v1.0.4 h1:gVPz/FMfvh57HdSJQyvBtF00j8JU4zdyUgIUNhlgg0A= github.com/envoyproxy/protoc-gen-validate v1.0.4/go.mod h1:qys6tmnRsYrQqIhm2bvKZH4Blx/1gTIZ2UKVY1M+Yew= -github.com/expr-lang/expr v1.17.0 h1:+vpszOyzKLQXC9VF+wA8cVA0tlA984/Wabc/1hF9Whg= -github.com/expr-lang/expr v1.17.0/go.mod h1:8/vRC7+7HBzESEqt5kKpYXxrxkr31SaO8r40VO/1IT4= +github.com/expr-lang/expr v1.17.7 h1:Q0xY/e/2aCIp8g9s/LGvMDCC5PxYlvHgDZRQ4y16JX8= +github.com/expr-lang/expr v1.17.7/go.mod h1:8/vRC7+7HBzESEqt5kKpYXxrxkr31SaO8r40VO/1IT4= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL+zU= github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk=