Skip to content

Commit ee4584f

Browse files
miltalexteo
authored andcommitted
[core] rebase taskman
1 parent 9a91bdd commit ee4584f

7 files changed

Lines changed: 27 additions & 27 deletions

File tree

common/event/tasksreleasedevent.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,12 @@ package event
2626

2727
import (
2828
"github.com/AliceO2Group/Control/common/utils"
29-
"github.com/pborman/uuid"
29+
"github.com/AliceO2Group/Control/common/utils/uid"
3030
)
3131

3232
type TasksReleasedEvent struct {
3333
eventBase
34-
EnvironmentId uuid.Array `json:"environmentId"`
34+
EnvironmentId uid.ID `json:"environmentId"`
3535
TaskIdsReleased []string `json:"taskIdsReleased"`
3636
TaskReleaseErrors map[string]error `json:"taskReleaseErrors"`
3737
}
@@ -40,7 +40,7 @@ func (tr *TasksReleasedEvent) GetName() string {
4040
return "TASK_RELEASED"
4141
}
4242

43-
func (tr *TasksReleasedEvent) GetEnvironmentId() uuid.Array {
43+
func (tr *TasksReleasedEvent) GetEnvironmentId() uid.ID {
4444
return tr.EnvironmentId
4545
}
4646

@@ -52,7 +52,7 @@ func (tr *TasksReleasedEvent) GetTaskReleaseErrors() map[string]error {
5252
return tr.TaskReleaseErrors
5353
}
5454

55-
func NewTasksReleasedEvent(envId uuid.Array, taskIdsReleased []string, taskReleaseErrors map[string]error) (tr *TasksReleasedEvent) {
55+
func NewTasksReleasedEvent(envId uid.ID, taskIdsReleased []string, taskReleaseErrors map[string]error) (tr *TasksReleasedEvent) {
5656
tr = &TasksReleasedEvent{
5757
eventBase: eventBase{
5858
Timestamp: utils.NewUnixTimestamp(),

core/environment/manager.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ func (envs *Manager) CreateEnvironment(workflowPath string, userVars map[string]
126126
if err != nil {
127127
envState := env.CurrentState()
128128
envTasks := env.Workflow().GetTasks()
129-
taskmanMessage := task.NewEnvironmentMessage(taskop.ReleaseTasks,env.id.Array(), envTasks, nil)
129+
taskmanMessage := task.NewEnvironmentMessage(taskop.ReleaseTasks,env.id, envTasks, nil)
130130
envs.taskman.MessageChannel <- taskmanMessage
131131
// rlsErr := envs.taskman.ReleaseTasks(env.id.Array(), envTasks)
132132
// if rlsErr != nil {
@@ -176,10 +176,10 @@ func (envs *Manager) TeardownEnvironment(environmentId uid.ID, force bool) error
176176
return errors.New(fmt.Sprintf("cannot teardown environment in state %s", env.CurrentState()))
177177
}
178178

179-
taskmanMessage := task.NewEnvironmentMessage(taskop.ReleaseTasks,environmentId.Array(), env.Workflow().GetTasks(), nil)
179+
taskmanMessage := task.NewEnvironmentMessage(taskop.ReleaseTasks,environmentId, env.Workflow().GetTasks(), nil)
180180

181181
pendingCh := make(chan *event.TasksReleasedEvent)
182-
envs.pendingTeardownsCh[environmentId.Array()] = pendingCh
182+
envs.pendingTeardownsCh[environmentId] = pendingCh
183183
envs.mu.Unlock()
184184
envs.taskman.MessageChannel <- taskmanMessage
185185

@@ -287,7 +287,7 @@ func (envs *Manager) handleDeviceEvent(evt event.DeviceEvent) {
287287
// If it's an update following a HOOK execution
288288
if t.GetControlMode() == controlmode.HOOK {
289289
isHook = true
290-
env, err := envs.environment(t.GetEnvironmentId().UUID())
290+
env, err := envs.environment(t.GetEnvironmentId())
291291
if err != nil {
292292
log.WithPrefix("scheduler").WithError(err).Error("cannot find environment for DeviceEvent")
293293
}
@@ -316,7 +316,7 @@ func (envs *Manager) handleDeviceEvent(evt event.DeviceEvent) {
316316
log.WithPrefix("scheduler").Error("cannot find task for DeviceEvent END_OF_STREAM")
317317
return
318318
}
319-
env, err := envs.environment(t.GetEnvironmentId().UUID())
319+
env, err := envs.environment(t.GetEnvironmentId())
320320
if err != nil {
321321
log.WithPrefix("scheduler").WithError(err).Error("cannot find environment for DeviceEvent")
322322
}

core/environment/transition_configure.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ func (t ConfigureTransition) do(env *Environment) (err error) {
143143
taskDescriptors := wf.GenerateTaskDescriptors()
144144
if len(taskDescriptors) != 0 {
145145
// err = t.taskman.AcquireTasks(env.Id().Array(), taskDescriptors)
146-
taskmanMessage := task.NewEnvironmentMessage(taskop.AcquireTasks, env.Id().Array(), nil, taskDescriptors)
146+
taskmanMessage := task.NewEnvironmentMessage(taskop.AcquireTasks, env.Id(), nil, taskDescriptors)
147147
t.taskman.MessageChannel <- taskmanMessage
148148
}
149149
if err != nil {
@@ -181,7 +181,7 @@ func (t ConfigureTransition) do(env *Environment) (err error) {
181181

182182
if len(tasks) != 0 {
183183
// err = t.taskman.ConfigureTasks(env.Id().Array(), tasks)
184-
taskmanMessage := task.NewEnvironmentMessage(taskop.ConfigureTasks, env.Id().Array(), tasks, nil)
184+
taskmanMessage := task.NewEnvironmentMessage(taskop.ConfigureTasks, env.Id(), tasks, nil)
185185
t.taskman.MessageChannel <- taskmanMessage
186186
// if err != nil {
187187
// return

core/server.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
package core
2727

2828
import (
29-
"fmt"
3029
"runtime"
3130
"sort"
3231
"strconv"
@@ -44,8 +43,6 @@ import (
4443

4544
"github.com/AliceO2Group/Control/core/environment"
4645
"github.com/AliceO2Group/Control/core/protos"
47-
"github.com/looplab/fsm"
48-
"github.com/mesos/mesos-go/api/v1/lib/extras/store"
4946
"google.golang.org/grpc/codes"
5047
"google.golang.org/grpc/reflection"
5148
"google.golang.org/grpc/status"

core/task/coreevent.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@ import (
2828
"github.com/AliceO2Group/Control/core/controlcommands"
2929
"github.com/AliceO2Group/Control/core/task/taskop"
3030
"github.com/mesos/mesos-go/api/v1/lib"
31-
"github.com/pborman/uuid"
31+
"github.com/AliceO2Group/Control/common/utils/uid"
32+
3233
)
3334

3435
type TaskmanMessage struct {
@@ -52,12 +53,12 @@ func (tm *TaskmanMessage) GetMessageType() taskop.MessageType {
5253
}
5354

5455
type environmentMessage struct {
55-
envId uuid.Array
56+
envId uid.ID
5657
tasks Tasks
5758
descriptors Descriptors
5859
}
5960

60-
func (em *environmentMessage) GetEnvironmentId() (envid uuid.Array) {
61+
func (em *environmentMessage) GetEnvironmentId() (envid uid.ID) {
6162
if em == nil {
6263
return
6364
}
@@ -78,7 +79,7 @@ func (em *environmentMessage) GetDescriptors() Descriptors {
7879
return em.descriptors
7980
}
8081

81-
func NewEnvironmentMessage(mt taskop.MessageType, envId uuid.Array, tasks Tasks, desc Descriptors) (t *TaskmanMessage) {
82+
func NewEnvironmentMessage(mt taskop.MessageType, envId uid.ID, tasks Tasks, desc Descriptors) (t *TaskmanMessage) {
8283
t = NewTaskmanMessage(mt)
8384
t.environmentMessage = environmentMessage{
8485
envId: envId,

core/task/managerV2.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ import (
3939
pb "github.com/AliceO2Group/Control/core/protos"
4040
"github.com/AliceO2Group/Control/core/task/taskop"
4141
"github.com/AliceO2Group/Control/core/the"
42+
"github.com/AliceO2Group/Control/common/utils/uid"
4243
"github.com/mesos/mesos-go/api/v1/lib/extras/store"
4344

4445
"github.com/AliceO2Group/Control/common/utils"
@@ -218,7 +219,7 @@ func (m *ManagerV2) RefreshClasses(taskClassesRequired []string) (err error) {
218219
return
219220
}
220221

221-
func (m *ManagerV2) acquireTasks(envId uuid.Array, taskDescriptors Descriptors) (err error) {
222+
func (m *ManagerV2) acquireTasks(envId uid.ID, taskDescriptors Descriptors) (err error) {
222223

223224
/*
224225
Here's what's gonna happen:
@@ -375,7 +376,7 @@ func (m *ManagerV2) acquireTasks(envId uuid.Array, taskDescriptors Descriptors)
375376
return
376377
}
377378

378-
func (m *ManagerV2) releaseTasks(envId uuid.Array, tasks Tasks) error {
379+
func (m *ManagerV2) releaseTasks(envId uid.ID, tasks Tasks) error {
379380

380381
taskReleaseErrors := make(map[string]error)
381382
taskIdsReleased := make([]string, 0)
@@ -399,7 +400,7 @@ func (m *ManagerV2) releaseTasks(envId uuid.Array, tasks Tasks) error {
399400
return nil
400401
}
401402

402-
func (m *ManagerV2) releaseTask(envId uuid.Array, task *Task) error {
403+
func (m *ManagerV2) releaseTask(envId uid.ID, task *Task) error {
403404
if task == nil {
404405
return TaskNotFoundError{}
405406
}
@@ -412,7 +413,7 @@ func (m *ManagerV2) releaseTask(envId uuid.Array, task *Task) error {
412413
return nil
413414
}
414415

415-
func (m *ManagerV2) configureTasks(envId uuid.Array, tasks Tasks) error {
416+
func (m *ManagerV2) configureTasks(envId uid.ID, tasks Tasks) error {
416417
notify := make(chan controlcommands.MesosCommandResponse)
417418
receivers, err := tasks.GetMesosCommandTargets()
418419
if err != nil {

core/task/scheduler.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ import (
3838
"time"
3939

4040
"github.com/AliceO2Group/Control/common/controlmode"
41+
"github.com/AliceO2Group/Control/common/utils/uid"
4142
"github.com/AliceO2Group/Control/core/task/channel"
4243
"github.com/AliceO2Group/Control/core/task/schedutil"
4344
"github.com/spf13/viper"
@@ -184,7 +185,7 @@ func (state *schedulerState) reconciliationCall() events.HandlerFunc {
184185
}
185186

186187
// Update metrics when we receive an offer
187-
func (state *schedulerState)trackOffersReceived() eventrules.Rule {
188+
func (state *schedulerState) trackOffersReceived() eventrules.Rule {
188189
return func(ctx context.Context, e *scheduler.Event, err error, chain eventrules.Chain) (context.Context, *scheduler.Event, error) {
189190
if err == nil {
190191
state.metricsAPI.offersReceived.Int(len(e.GetOffers().GetOffers()))
@@ -406,7 +407,7 @@ func (state *schedulerState) resourceOffers(fidStore store.Singleton) events.Han
406407
}).Trace("received offers")
407408
}
408409

409-
var descriptorsToDeploy Descriptors
410+
var descriptorsStillToDeploy Descriptors
410411
select {
411412
case descriptorsStillToDeploy = <- state.tasksToDeploy:
412413
if viper.GetBool("veryVerbose") {
@@ -461,8 +462,8 @@ func (state *schedulerState) resourceOffers(fidStore store.Singleton) events.Han
461462
// for a likely significant multinode launch performance increase
462463
for _, offer := range offers {
463464
var (
464-
remainingResources = mesos.Resources(offer.Resources)
465-
tasks = make([]mesos.TaskInfo, 0)
465+
remainingResourcesInOffer = mesos.Resources(offer.Resources)
466+
taskInfosToLaunchForCurrentOffer = make([]mesos.TaskInfo, 0)
466467
tasksDeployedForCurrentOffer = make(DeploymentMap)
467468
targetExecutorId = mesos.ExecutorID{}
468469
)
@@ -526,7 +527,7 @@ func (state *schedulerState) resourceOffers(fidStore store.Singleton) events.Han
526527
Warning("no resource demands for descriptor, invalid class perhaps?")
527528
continue
528529
}
529-
if !Resources(remainingResources).Satisfy(wants) {
530+
if !Resources(remainingResourcesInOffer).Satisfy(wants) {
530531
if viper.GetBool("veryVerbose") {
531532
log.WithPrefix("scheduler").
532533
WithFields(logrus.Fields{

0 commit comments

Comments
 (0)