Skip to content

Commit 0e147a5

Browse files
committed
[core] Add GetRootRole and SetGlobalVar to workflow & refactor callable
1 parent cc11b04 commit 0e147a5

9 files changed

Lines changed: 310 additions & 165 deletions

File tree

core/integration/odc/plugin.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -656,7 +656,7 @@ func (p *Plugin) ObjectStack(varStack map[string]string) (stack map[string]inter
656656
}
657657
odcTopologyFullname = "(xml, " + odc_topology + ")"
658658
}
659-
659+
660660
log.Debugf("ODC topology pretty name: %s", odcTopologyFullname)
661661
//FIXME: Disabled because env does not exist yet when GenerateEPNWorkflowScript is called
662662
//parsedEnvId, err := uid.FromString(envId)

core/workflow/callable/call.go

Lines changed: 12 additions & 136 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
/*
22
* === This file is part of ALICE O² ===
33
*
4-
* Copyright 2021 CERN and copyright holders of ALICE O².
4+
* Copyright 2021-2022 CERN and copyright holders of ALICE O².
55
* Author: Teo Mrnjavac <teo.mrnjavac@cern.ch>
66
*
77
* This program is free software: you can redistribute it and/or modify
@@ -27,19 +27,15 @@ package callable
2727
import (
2828
"errors"
2929
"fmt"
30-
"sort"
3130
"strconv"
32-
"strings"
3331
"sync"
3432
texttemplate "text/template"
3533
"time"
3634

3735
"github.com/AliceO2Group/Control/apricot"
38-
"github.com/AliceO2Group/Control/common/event"
3936
"github.com/AliceO2Group/Control/common/logger"
4037
"github.com/AliceO2Group/Control/common/logger/infologger"
4138
"github.com/AliceO2Group/Control/common/utils"
42-
"github.com/AliceO2Group/Control/common/utils/uid"
4339
"github.com/AliceO2Group/Control/configuration/template"
4440
"github.com/AliceO2Group/Control/core/integration"
4541
"github.com/AliceO2Group/Control/core/task"
@@ -48,19 +44,6 @@ import (
4844

4945
var log = logger.New(logrus.StandardLogger(), "callable")
5046

51-
type HookWeight int
52-
type Calls []*Call
53-
type Hooks []Hook
54-
type HooksMap map[HookWeight]Hooks
55-
type CallsMap map[HookWeight]Calls
56-
57-
type Hook interface {
58-
GetParentRole() interface{}
59-
GetParentRolePath() string
60-
GetName() string
61-
GetTraits() task.Traits
62-
}
63-
6447
type Call struct {
6548
Func string
6649
Return string
@@ -71,80 +54,16 @@ type Call struct {
7154
await chan error
7255
}
7356

74-
func ParseTriggerExpression(triggerExpr string) (triggerName string, triggerWeight HookWeight) {
75-
var (
76-
triggerWeightS string
77-
triggerWeightI int
78-
err error
79-
)
80-
81-
// Split the trigger expression of this task by + or -
82-
if splitIndex := strings.LastIndexFunc(triggerExpr, func(r rune) bool {
83-
return r == '+' || r == '-'
84-
}); splitIndex >= 0 {
85-
triggerName, triggerWeightS = triggerExpr[:splitIndex], triggerExpr[splitIndex:]
86-
} else {
87-
triggerName, triggerWeightS = triggerExpr, "+0"
88-
}
89-
90-
triggerWeightI, err = strconv.Atoi(triggerWeightS)
91-
if err != nil {
92-
log.Warnf("invalid trigger weight definition %s, defaulting to %s", triggerExpr, triggerName + "+0")
93-
triggerWeightI = 0
94-
}
95-
triggerWeight = HookWeight(triggerWeightI)
96-
97-
return
98-
}
99-
100-
func (m HooksMap) GetWeights() []HookWeight {
101-
weights := make([]int, len(m))
102-
i := 0
103-
for k, _ := range m {
104-
weights[i] = int(k)
105-
i++
106-
}
107-
sort.Ints(weights)
108-
out := make([]HookWeight, len(weights))
109-
for i, v := range weights{
110-
out[i] = HookWeight(v)
111-
}
112-
return out
113-
}
114-
115-
func (m CallsMap) GetWeights() []HookWeight {
116-
weights := make([]int, len(m))
117-
i := 0
118-
for k, _ := range m {
119-
weights[i] = int(k)
120-
i++
121-
}
122-
sort.Ints(weights)
123-
out := make([]HookWeight, len(weights))
124-
for i, v := range weights{
125-
out[i] = HookWeight(v)
126-
}
127-
return out
128-
}
129-
130-
func (s Hooks) FilterCalls() (calls Calls) {
131-
calls = make(Calls, 0)
132-
for _, v := range s {
133-
if c, ok := v.(*Call); ok {
134-
calls = append(calls, c)
135-
}
136-
}
137-
return
138-
}
57+
type Calls []*Call
13958

140-
func (s Hooks) FilterTasks() (tasks task.Tasks) {
141-
tasks = make(task.Tasks, 0)
142-
for _, v := range s {
143-
if t, ok := v.(*task.Task); ok {
144-
tasks = append(tasks, t)
145-
}
59+
func NewCall(funcCall string, returnVar string, varStack map[string]string, parent ParentRole) (call *Call) {
60+
return &Call{
61+
Func: funcCall,
62+
Return: returnVar,
63+
VarStack: varStack,
64+
Traits: parent.GetTaskTraits(),
65+
parentRole: parent,
14666
}
147-
return
14867
}
14968

15069
func (s Calls) CallAll() map[*Call]error {
@@ -182,16 +101,6 @@ func (s Calls) AwaitAll() map[*Call]error {
182101
return errors
183102
}
184103

185-
func NewCall(funcCall string, returnVar string, varStack map[string]string, parent ParentRole) (call *Call) {
186-
return &Call{
187-
Func: funcCall,
188-
Return: returnVar,
189-
VarStack: varStack,
190-
Traits: parent.GetTaskTraits(),
191-
parentRole: parent,
192-
}
193-
}
194-
195104
func (c *Call) Call() error {
196105
log.WithField("trigger", c.Traits.Trigger).
197106
WithField("await", c.Traits.Await).
@@ -201,9 +110,9 @@ func (c *Call) Call() error {
201110
output := "{{" + c.Func + "}}"
202111
returnVar := c.Return
203112
fields := template.Fields{
204-
template.WrapPointer(&output),
205-
template.WrapPointer(&returnVar),
206-
}
113+
template.WrapPointer(&output),
114+
template.WrapPointer(&returnVar),
115+
}
207116
c.VarStack["environment_id"] = c.parentRole.GetEnvironmentId().String()
208117
c.VarStack["__call_func"] = c.Func
209118
c.VarStack["__call_timeout"] = c.Traits.Timeout
@@ -261,36 +170,3 @@ func (c *Call) GetName() string {
261170
func (c *Call) GetTraits() task.Traits {
262171
return c.Traits
263172
}
264-
265-
type ParentRole interface {
266-
GetPath() string
267-
GetTaskTraits() task.Traits
268-
GetEnvironmentId() uid.ID
269-
ConsolidatedVarStack() (varStack map[string]string, err error)
270-
SendEvent(event.Event)
271-
SetRuntimeVar(key string, value string)
272-
GetCurrentRunNumber() uint32
273-
}
274-
275-
func AcquireTimeout(defaultTimeout time.Duration, varStack map[string]string, callName string, envId string) time.Duration {
276-
timeout := defaultTimeout
277-
timeoutStr, ok := varStack["__call_timeout"] // the Call interface ensures we'll find this key
278-
// see Call.Call in callable/call.go for details
279-
if ok {
280-
var err error
281-
timeout, err = time.ParseDuration(timeoutStr)
282-
if err != nil {
283-
timeout = defaultTimeout
284-
log.WithField("partition", envId).
285-
WithField("call", callName).
286-
WithField("default", timeout.String()).
287-
Warn("could not parse timeout declaration for hook call")
288-
}
289-
} else {
290-
log.WithField("partition", envId).
291-
WithField("call", callName).
292-
WithField("default", timeout.String()).
293-
Warn("could not get timeout declaration for hook call")
294-
}
295-
return timeout
296-
}

core/workflow/callable/hook.go

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* === This file is part of ALICE O² ===
3+
*
4+
* Copyright 2021-2022 CERN and copyright holders of ALICE O².
5+
* Author: Teo Mrnjavac <teo.mrnjavac@cern.ch>
6+
*
7+
* This program is free software: you can redistribute it and/or modify
8+
* it under the terms of the GNU General Public License as published by
9+
* the Free Software Foundation, either version 3 of the License, or
10+
* (at your option) any later version.
11+
*
12+
* This program is distributed in the hope that it will be useful,
13+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
14+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15+
* GNU General Public License for more details.
16+
*
17+
* You should have received a copy of the GNU General Public License
18+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
19+
*
20+
* In applying this license CERN does not waive the privileges and
21+
* immunities granted to it by virtue of its status as an
22+
* Intergovernmental Organization or submit itself to any jurisdiction.
23+
*/
24+
25+
package callable
26+
27+
import "github.com/AliceO2Group/Control/core/task"
28+
29+
type Hook interface {
30+
GetParentRole() interface{}
31+
GetParentRolePath() string
32+
GetName() string
33+
GetTraits() task.Traits
34+
}
35+
36+
type Hooks []Hook
37+
38+
func (s Hooks) FilterCalls() (calls Calls) {
39+
calls = make(Calls, 0)
40+
for _, v := range s {
41+
if c, ok := v.(*Call); ok {
42+
calls = append(calls, c)
43+
}
44+
}
45+
return
46+
}
47+
48+
func (s Hooks) FilterTasks() (tasks task.Tasks) {
49+
tasks = make(task.Tasks, 0)
50+
for _, v := range s {
51+
if t, ok := v.(*task.Task); ok {
52+
tasks = append(tasks, t)
53+
}
54+
}
55+
return
56+
}

core/workflow/callable/maps.go

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* === This file is part of ALICE O² ===
3+
*
4+
* Copyright 2021-2022 CERN and copyright holders of ALICE O².
5+
* Author: Teo Mrnjavac <teo.mrnjavac@cern.ch>
6+
*
7+
* This program is free software: you can redistribute it and/or modify
8+
* it under the terms of the GNU General Public License as published by
9+
* the Free Software Foundation, either version 3 of the License, or
10+
* (at your option) any later version.
11+
*
12+
* This program is distributed in the hope that it will be useful,
13+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
14+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15+
* GNU General Public License for more details.
16+
*
17+
* You should have received a copy of the GNU General Public License
18+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
19+
*
20+
* In applying this license CERN does not waive the privileges and
21+
* immunities granted to it by virtue of its status as an
22+
* Intergovernmental Organization or submit itself to any jurisdiction.
23+
*/
24+
25+
package callable
26+
27+
import "sort"
28+
29+
type HooksMap map[HookWeight]Hooks
30+
type CallsMap map[HookWeight]Calls
31+
type HookWeight int
32+
33+
func (m HooksMap) GetWeights() []HookWeight {
34+
weights := make([]int, len(m))
35+
i := 0
36+
for k, _ := range m {
37+
weights[i] = int(k)
38+
i++
39+
}
40+
sort.Ints(weights)
41+
out := make([]HookWeight, len(weights))
42+
for i, v := range weights {
43+
out[i] = HookWeight(v)
44+
}
45+
return out
46+
}
47+
48+
func (m CallsMap) GetWeights() []HookWeight {
49+
weights := make([]int, len(m))
50+
i := 0
51+
for k, _ := range m {
52+
weights[i] = int(k)
53+
i++
54+
}
55+
sort.Ints(weights)
56+
out := make([]HookWeight, len(weights))
57+
for i, v := range weights {
58+
out[i] = HookWeight(v)
59+
}
60+
return out
61+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* === This file is part of ALICE O² ===
3+
*
4+
* Copyright 2021-2022 CERN and copyright holders of ALICE O².
5+
* Author: Teo Mrnjavac <teo.mrnjavac@cern.ch>
6+
*
7+
* This program is free software: you can redistribute it and/or modify
8+
* it under the terms of the GNU General Public License as published by
9+
* the Free Software Foundation, either version 3 of the License, or
10+
* (at your option) any later version.
11+
*
12+
* This program is distributed in the hope that it will be useful,
13+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
14+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15+
* GNU General Public License for more details.
16+
*
17+
* You should have received a copy of the GNU General Public License
18+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
19+
*
20+
* In applying this license CERN does not waive the privileges and
21+
* immunities granted to it by virtue of its status as an
22+
* Intergovernmental Organization or submit itself to any jurisdiction.
23+
*/
24+
25+
package callable
26+
27+
import (
28+
"github.com/AliceO2Group/Control/common/event"
29+
"github.com/AliceO2Group/Control/common/utils/uid"
30+
"github.com/AliceO2Group/Control/core/task"
31+
)
32+
33+
type ParentRole interface {
34+
GetPath() string
35+
GetTaskTraits() task.Traits
36+
GetEnvironmentId() uid.ID
37+
ConsolidatedVarStack() (varStack map[string]string, err error)
38+
SendEvent(event.Event)
39+
SetRuntimeVar(key string, value string)
40+
SetRuntimeVars(kv map[string]string)
41+
SetGlobalRuntimeVar(key string, value string)
42+
SetGlobalRuntimeVars(kv map[string]string)
43+
GetCurrentRunNumber() uint32
44+
}

0 commit comments

Comments
 (0)