Skip to content

Commit f81c9f7

Browse files
knopers8teo
authored andcommitted
[OCTRL-683] Allow the OCClib-controlled tasks to exit on their own
Two problems are solved here: - OccServer::EventStream thread would not exit when reaching down, thus making OCClib-controlled tasks never shut their server down - executor would send SIGTERM as soon as a task reported reaching DONE, thus not allowing it to exit on its own.
1 parent ebe6e9c commit f81c9f7

4 files changed

Lines changed: 14 additions & 8 deletions

File tree

core/integration/kafka/protos/kafka.proto

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,4 +19,4 @@ message EnvInfo {
1919
string state = 4;
2020
repeated string detectors = 5;
2121
uint64 enterStateTimestamp = 6; // ms since epoch.
22-
}
22+
}

executor/executable/controllabletask.go

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ import (
4949
)
5050

5151
const (
52+
DONE_TIMEOUT = 1 * time.Second
5253
SIGTERM_TIMEOUT = 1 * time.Second
5354
SIGINT_TIMEOUT = 3 * time.Second
5455
KILL_TRANSITION_TIMEOUT = 1 * time.Second
@@ -417,13 +418,13 @@ func (t *ControllableTask) Launch() error {
417418
}
418419
for {
419420
if t.rpc == nil {
420-
log.WithField("partition", t.knownEnvironmentId.String()).
421+
log.WithField("partition", t.knownEnvironmentId.String()).WithField("taskId", deo.TaskId.String()).
421422
WithError(err).Debug("event stream done")
422423
break
423424
}
424425
esr, err := esc.Recv()
425426
if err == io.EOF {
426-
log.WithField("partition", t.knownEnvironmentId.String()).
427+
log.WithField("partition", t.knownEnvironmentId.String()).WithField("taskId", deo.TaskId.String()).
427428
WithError(err).Debug("event stream EOF")
428429
break
429430
}
@@ -432,7 +433,7 @@ func (t *ControllableTask) Launch() error {
432433
WithField("partition", t.knownEnvironmentId.String()).
433434
WithField("errorType", reflect.TypeOf(err)).
434435
WithField("level", infologger.IL_Devel).
435-
Warning("error receiving event from task")
436+
Warningf("error receiving event from task %s", deo.TaskId.String())
436437
if status.Code(err) == codes.Unavailable {
437438
break
438439
}
@@ -690,15 +691,20 @@ func (t *ControllableTask) Kill() error {
690691
if reachedState == "DONE" {
691692
log.WithField("partition", t.knownEnvironmentId.String()).
692693
WithField("taskId", t.ti.TaskID.Value).
693-
Debug("task exited correctly")
694+
Debug("task reached DONE, will try to terminate it gently")
694695
t.pendingFinalTaskStateCh <- mesos.TASK_FINISHED
695696
} else { // something went wrong
696697
log.WithField("partition", t.knownEnvironmentId.String()).
697698
WithField("taskId", t.ti.TaskID.Value).
698-
Debug("task killed")
699+
Debug("task already died or will be killed soon")
699700
t.pendingFinalTaskStateCh <- mesos.TASK_KILLED
700701
}
701702

703+
if reachedState == "DONE" {
704+
log.WithField("taskId", t.ti.GetTaskID()).Debugf("waiting %.1fs before sending SIGTERM to task", DONE_TIMEOUT.Seconds())
705+
time.Sleep(DONE_TIMEOUT)
706+
}
707+
702708
return t.doTermIntKill(pid)
703709
}
704710

occ/occlib/OccServer.cxx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ grpc::Status OccServer::EventStream(grpc::ServerContext* context,
7575
});
7676

7777
bool isStreamOpen = true;
78-
while (!m_destroying && isStreamOpen) {
78+
while (!m_destroying && isStreamOpen && m_rco->getState() != t_State::done) {
7979
pb::DeviceEvent *newEvent;
8080
bool ok = eventQueue.pop(newEvent);
8181
if (!ok) { // queue empty, sleep and retry

occ/plugin/OccLiteServer.cxx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ ::grpc::Status OccLite::Service::Transition(::grpc::ServerContext* context,
106106

107107
auto nopbResponse = std::get<0>(transitionOutcome);
108108
*response = nopbResponse;
109-
OLOG(info) << "Transition response: " << response->state << "ok: " << response->ok;
109+
OLOG(info) << "Transition response: " << response->state << " ok: " << response->ok;
110110

111111
return grpc::Status::OK;
112112
}

0 commit comments

Comments
 (0)