Skip to content

Commit 8f3ff12

Browse files
committed
[core] Fix pending teardown with asynchronous ReleaseTasks
1 parent 7ee4d0b commit 8f3ff12

4 files changed

Lines changed: 122 additions & 8 deletions

File tree

common/event/announcetaskpidevent.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,16 @@
2424

2525
package event
2626

27+
import "github.com/AliceO2Group/Control/common/utils"
28+
2729
type AnnounceTaskPIDEvent struct {
2830
eventBase
2931
TaskId string `json:"taskId"`
3032
PID int32 `json:"pid"`
31-
MessageType string `json:"_messageType"`
33+
}
34+
35+
func (tmb *AnnounceTaskPIDEvent) GetName() string {
36+
return "ANNOUNCE_TASK_PID"
3237
}
3338

3439
func (tmb *AnnounceTaskPIDEvent) GetTaskId() string {
@@ -41,9 +46,12 @@ func (tmb *AnnounceTaskPIDEvent) GetTaskPID() int {
4146

4247
func NewAnnounceTaskPIDEvent(id string, pid int32) (tm *AnnounceTaskPIDEvent) {
4348
tm = &AnnounceTaskPIDEvent{
49+
eventBase: eventBase{
50+
Timestamp: utils.NewUnixTimestamp(),
51+
MessageType: "AnnounceTaskPIDEvent",
52+
},
4453
TaskId: id,
4554
PID: pid,
46-
MessageType: "AnnounceTaskPIDEvent",
4755
}
4856
return tm
4957
}

common/event/tasksreleasedevent.go

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* === This file is part of ALICE O² ===
3+
*
4+
* Copyright 2020 CERN and copyright holders of ALICE O².
5+
* Author: Teo Mrnjavac <teo.mrnjavac@cern.ch>
6+
*
7+
* This program is free software: you can redistribute it and/or modify
8+
* it under the terms of the GNU General Public License as published by
9+
* the Free Software Foundation, either version 3 of the License, or
10+
* (at your option) any later version.
11+
*
12+
* This program is distributed in the hope that it will be useful,
13+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
14+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15+
* GNU General Public License for more details.
16+
*
17+
* You should have received a copy of the GNU General Public License
18+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
19+
*
20+
* In applying this license CERN does not waive the privileges and
21+
* immunities granted to it by virtue of its status as an
22+
* Intergovernmental Organization or submit itself to any jurisdiction.
23+
*/
24+
25+
package event
26+
27+
import (
28+
"github.com/AliceO2Group/Control/common/utils"
29+
"github.com/pborman/uuid"
30+
)
31+
32+
type TasksReleasedEvent struct {
33+
eventBase
34+
EnvironmentId uuid.Array `json:"environmentId"`
35+
TaskIdsReleased []string `json:"taskIdsReleased"`
36+
TaskReleaseErrors map[string]error `json:"taskReleaseErrors"`
37+
}
38+
39+
func (tr *TasksReleasedEvent) GetName() string {
40+
return "TASK_RELEASED"
41+
}
42+
43+
func (tr *TasksReleasedEvent) GetEnvironmentId() uuid.Array {
44+
return tr.EnvironmentId
45+
}
46+
47+
func (tr *TasksReleasedEvent) GetTaskIds() []string {
48+
return tr.TaskIdsReleased
49+
}
50+
51+
func (tr *TasksReleasedEvent) GetTaskReleaseErrors() map[string]error {
52+
return tr.TaskReleaseErrors
53+
}
54+
55+
func NewTasksReleasedEvent(envId uuid.Array, taskIdsReleased []string, taskReleaseErrors map[string]error) (tr *TasksReleasedEvent) {
56+
tr = &TasksReleasedEvent{
57+
eventBase: eventBase{
58+
Timestamp: utils.NewUnixTimestamp(),
59+
MessageType: "TasksReleasedEvent",
60+
},
61+
EnvironmentId: envId,
62+
TaskIdsReleased: taskIdsReleased,
63+
TaskReleaseErrors: taskReleaseErrors,
64+
}
65+
return tr
66+
}

core/environment/manager.go

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,17 +43,19 @@ import (
4343
)
4444

4545
type Manager struct {
46-
mu sync.RWMutex
47-
m map[uid.ID]*Environment
48-
taskman *task.ManagerV2
49-
incomingEventCh <-chan event.Event
46+
mu sync.RWMutex
47+
m map[uid.ID]*Environment
48+
taskman *task.ManagerV2
49+
incomingEventCh <-chan event.Event
50+
pendingTeardownsCh map[uid.ID]chan *event.TasksReleasedEvent
5051
}
5152

5253
func NewEnvManager(tm *task.ManagerV2, incomingEventCh <-chan event.Event) *Manager {
5354
envman := &Manager{
5455
m: make(map[uid.ID]*Environment),
5556
taskman: tm,
5657
incomingEventCh: incomingEventCh,
58+
pendingTeardownsCh: make(map[uid.ID]chan *event.TasksReleasedEvent),
5759
}
5860

5961
go func() {
@@ -63,6 +65,14 @@ func NewEnvManager(tm *task.ManagerV2, incomingEventCh <-chan event.Event) *Mana
6365
switch typedEvent := incomingEvent.(type) {
6466
case event.DeviceEvent:
6567
envman.handleDeviceEvent(typedEvent)
68+
case *event.TasksReleasedEvent:
69+
// If we got a TasksReleasedEvent, it must be matched with a pending
70+
// environment teardown.
71+
if thisEnvCh, ok := envman.pendingTeardownsCh[typedEvent.GetEnvironmentId()]; ok {
72+
thisEnvCh <- typedEvent
73+
close(thisEnvCh)
74+
delete(envman.pendingTeardownsCh, typedEvent.GetEnvironmentId())
75+
}
6676
default:
6777
// noop
6878
}
@@ -167,7 +177,30 @@ func (envs *Manager) TeardownEnvironment(environmentId uid.ID, force bool) error
167177
}
168178

169179
taskmanMessage := task.NewEnvironmentMessage(taskop.ReleaseTasks,environmentId.Array(), env.Workflow().GetTasks(), nil)
180+
181+
pendingCh := make(chan *event.TasksReleasedEvent)
182+
envs.pendingTeardownsCh[environmentId.Array()] = pendingCh
183+
envs.mu.Unlock()
170184
envs.taskman.MessageChannel <- taskmanMessage
185+
186+
incomingEv := <- pendingCh
187+
envs.mu.Lock()
188+
189+
// If some tasks failed to release
190+
if taskReleaseErrors := incomingEv.GetTaskReleaseErrors(); len(taskReleaseErrors) > 0 {
191+
for taskId, err := range taskReleaseErrors {
192+
log.WithFields(logrus.Fields{
193+
"taskId": taskId,
194+
"environmentId": environmentId,
195+
}).
196+
WithError(err).
197+
Warn("task failed to release")
198+
}
199+
err = fmt.Errorf("%d tasks failed to release for environment %s",
200+
len(taskReleaseErrors), environmentId)
201+
return err
202+
}
203+
171204
// err = envs.taskman.ReleaseTasks(environmentId.Array(), env.Workflow().GetTasks())
172205
// if err != nil {
173206
// return err

core/task/managerV2.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -385,18 +385,25 @@ func (m *ManagerV2) releaseTasks(envId uuid.Array, tasks Tasks) error {
385385
m.mu.Lock()
386386
defer m.mu.Unlock()
387387

388+
taskReleaseErrors := make(map[string]error)
389+
taskIdsReleased := make([]string, 0)
390+
388391
for _, task := range tasks {
389392
err := m.releaseTask(envId, task)
390-
if err != nil {
393+
if err == nil {
394+
taskIdsReleased = append(taskIdsReleased, task.GetTaskId())
395+
} else {
391396
switch err.(type) {
392397
case TaskAlreadyReleasedError:
393398
continue
394399
default:
395-
return err
400+
taskReleaseErrors[task.GetTaskId()] = err
396401
}
397402
}
398403
}
399404

405+
m.internalEventCh <- event.NewTasksReleasedEvent(envId, taskIdsReleased, taskReleaseErrors)
406+
400407
return nil
401408
}
402409

0 commit comments

Comments
 (0)