Skip to content

Commit dc05890

Browse files
miltalexteo
authored andcommitted
[executor] Send TaskMessage with PID to scheduler
1 parent 1ed8908 commit dc05890

4 files changed

Lines changed: 31 additions & 2 deletions

File tree

executor/executable/basictask.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,13 @@ func (t *BasicTask) Launch() error {
238238
Debug("basic task staged")
239239

240240
go t.sendStatus(mesos.TASK_RUNNING, "")
241+
taskMessage := event.NewTaskMessage(t.ti.Name,t.ti.TaskID.GetValue(),int32(t.taskCmd.Process.Pid))
242+
jsonEvent, err := json.Marshal(taskMessage)
243+
if err != nil {
244+
log.WithError(err).Warning("error marshaling message from task")
245+
} else {
246+
t.sendMessage(jsonEvent)
247+
}
241248

242249
return nil
243250
}

executor/executable/controllabletask.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,7 @@ func (t *ControllableTask) Launch() error {
147147
}
148148
t.rpc.TaskCmd = taskCmd
149149

150+
var pid int32
150151
elapsed := 0 * time.Second
151152
for {
152153
log.WithFields(logrus.Fields{
@@ -176,6 +177,7 @@ func (t *ControllableTask) Launch() error {
176177
}
177178
// NOTE: we acquire the transitioner-dependent STANDBY equivalent state
178179
reachedState := t.rpc.FromDeviceState(response.GetState())
180+
pid = response.GetPid()
179181

180182
if reachedState == "STANDBY" && err == nil {
181183
log.WithField("id", t.ti.TaskID.Value).
@@ -222,6 +224,14 @@ func (t *ControllableTask) Launch() error {
222224

223225
// send RUNNING
224226
t.sendStatus(mesos.TASK_RUNNING, "")
227+
taskMessage := event.NewTaskMessage(t.ti.Name,t.ti.TaskID.GetValue(),pid)
228+
jsonEvent, err := json.Marshal(taskMessage)
229+
if err != nil {
230+
log.WithError(err).Warning("error marshaling message from task")
231+
} else {
232+
t.sendMessage(jsonEvent)
233+
}
234+
225235

226236
// Process events from task in yet another goroutine
227237
go func() {

executor/executable/task.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ var log = logger.New(logrus.StandardLogger(), "executor")
5454

5555
type SendStatusFunc func(state mesos.TaskState, message string)
5656
type SendDeviceEventFunc func(event event.DeviceEvent)
57+
type SendMessageFunc func(message []byte)
5758

5859
type Task interface {
5960
Launch() error
@@ -68,9 +69,10 @@ type taskBase struct {
6869

6970
sendStatus SendStatusFunc
7071
sendDeviceEvent SendDeviceEventFunc
72+
sendMessage SendMessageFunc
7173
}
7274

73-
func NewTask(taskInfo mesos.TaskInfo, sendStatusFunc SendStatusFunc, sendDeviceEventFunc SendDeviceEventFunc) Task {
75+
func NewTask(taskInfo mesos.TaskInfo, sendStatusFunc SendStatusFunc, sendDeviceEventFunc SendDeviceEventFunc, sendMessageFunc SendMessageFunc) Task {
7476
var commandInfo common.TaskCommandInfo
7577

7678
tciData := taskInfo.GetData()
@@ -104,6 +106,7 @@ func NewTask(taskInfo mesos.TaskInfo, sendStatusFunc SendStatusFunc, sendDeviceE
104106
tci: &commandInfo,
105107
sendStatus: sendStatusFunc,
106108
sendDeviceEvent: sendDeviceEventFunc,
109+
sendMessage: sendMessageFunc,
107110
},
108111
}
109112
case controlmode.DIRECT:
@@ -115,6 +118,7 @@ func NewTask(taskInfo mesos.TaskInfo, sendStatusFunc SendStatusFunc, sendDeviceE
115118
tci: &commandInfo,
116119
sendStatus: sendStatusFunc,
117120
sendDeviceEvent: sendDeviceEventFunc,
121+
sendMessage: sendMessageFunc,
118122
},
119123
rpc: nil,
120124
}

executor/handlers.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,13 @@ func makeSendDeviceEventFunc(state *internalState) executable.SendDeviceEventFun
6060
}
6161
}
6262

63+
func makeSendMessageFunc(state *internalState) executable.SendMessageFunc {
64+
return func(message []byte) {
65+
// to send task events using state.
66+
state.messageCh <- message
67+
}
68+
}
69+
6370
func handleOutgoingMessage(state *internalState, message []byte) {
6471
_, _ = state.cli.Send(context.TODO(), calls.NonStreaming(calls.Message(message)))
6572
log.WithFields(logrus.Fields{
@@ -195,7 +202,8 @@ func handleLaunchEvent(state *internalState, taskInfo mesos.TaskInfo) {
195202

196203
myTask := executable.NewTask(taskInfo,
197204
makeSendStatusUpdateFunc(state, taskInfo),
198-
makeSendDeviceEventFunc(state))
205+
makeSendDeviceEventFunc(state),
206+
makeSendMessageFunc(state))
199207

200208
err := myTask.Launch()
201209

0 commit comments

Comments
 (0)