Skip to content

Commit 54d3628

Browse files
committed
[core] Add environment ID to log messages in scheduler and task.Task
1 parent de59af7 commit 54d3628

2 files changed

Lines changed: 81 additions & 47 deletions

File tree

core/task/scheduler.go

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,8 @@ func runSchedulerController(ctx context.Context,
133133
}
134134
return
135135
}
136-
log.WithPrefix("scheduler").Info("disconnected")
136+
log.WithPrefix("scheduler").
137+
Info("disconnected")
137138
}),
138139
)
139140
}
@@ -437,6 +438,7 @@ func (state *schedulerState) resourceOffers(fidStore store.Singleton) events.Han
437438
taskClasses[i] = d.TaskClassName
438439
}
439440
log.WithPrefix("scheduler").
441+
WithField("partition", envId.String()).
440442
WithFields(logrus.Fields{
441443
"roles": strings.Join(rolePaths, ", "),
442444
"classes": strings.Join(taskClasses, ", "),
@@ -542,6 +544,7 @@ func (state *schedulerState) resourceOffers(fidStore store.Singleton) events.Han
542544
if !offerAttributes.Satisfy(descriptorConstraints[descriptor]) {
543545
if viper.GetBool("veryVerbose") {
544546
log.WithPrefix("scheduler").
547+
WithField("partition", envId.String()).
545548
WithFields(logrus.Fields{
546549
"taskClass": descriptor.TaskClassName,
547550
"constraints": descriptorConstraints[descriptor],
@@ -553,11 +556,14 @@ func (state *schedulerState) resourceOffers(fidStore store.Singleton) events.Han
553556
}
554557
continue
555558
}
556-
log.WithPrefix("scheduler").Debug("offer attributes satisfy constraints")
559+
log.WithPrefix("scheduler").
560+
WithField("partition", envId.String()).
561+
Debug("offer attributes satisfy constraints")
557562

558563
wants := state.taskman.GetWantsForDescriptor(descriptor)
559564
if wants == nil {
560565
log.WithPrefix("scheduler").
566+
WithField("partition", envId.String()).
561567
WithFields(logrus.Fields{
562568
"class": descriptor.TaskClassName,
563569
"level": infologger.IL_Devel,
@@ -569,6 +575,7 @@ func (state *schedulerState) resourceOffers(fidStore store.Singleton) events.Han
569575
if !Resources(remainingResourcesInOffer).Satisfy(wants) {
570576
if viper.GetBool("veryVerbose") {
571577
log.WithPrefix("scheduler").
578+
WithField("partition", envId.String()).
572579
WithFields(logrus.Fields{
573580
"taskClass": descriptor.TaskClassName,
574581
"wants": *wants,
@@ -619,9 +626,12 @@ func (state *schedulerState) resourceOffers(fidStore store.Singleton) events.Han
619626
taskPtr := state.taskman.newTaskForMesosOffer(&offer, descriptor, bindMap, targetExecutorId)
620627
if taskPtr == nil {
621628
log.WithPrefix("scheduler").
629+
WithField("partition", envId.String()).
622630
WithField("offerId", offer.ID.Value).
623631
Error("cannot get task for offer+descriptor, this should never happen")
624-
log.Trace("state unlock")
632+
log.WithPrefix("scheduler").
633+
WithField("partition", envId.String()).
634+
Trace("state unlock")
625635
continue
626636
}
627637

@@ -637,6 +647,7 @@ func (state *schedulerState) resourceOffers(fidStore store.Singleton) events.Han
637647
log.WithPrefix("scheduler").
638648
WithField("offerId", offer.ID.Value).
639649
WithError(err).
650+
WithField("partition", envId.String()).
640651
Error("cannot build task command")
641652
continue
642653
}
@@ -700,6 +711,7 @@ func (state *schedulerState) resourceOffers(fidStore store.Singleton) events.Han
700711
jsonCommand, err = json.Marshal(&runCommand)
701712
if err != nil {
702713
log.WithPrefix("scheduler").
714+
WithField("partition", envId.String()).
703715
WithFields(logrus.Fields{
704716
"error": err.Error(),
705717
"value": *runCommand.Value,
@@ -710,6 +722,7 @@ func (state *schedulerState) resourceOffers(fidStore store.Singleton) events.Han
710722
Error("cannot serialize mesos.CommandInfo for executor")
711723
state.Unlock()
712724
log.WithPrefix("scheduler").
725+
WithField("partition", envId.String()).
713726
Trace("state unlock")
714727
continue
715728
}
@@ -737,6 +750,7 @@ func (state *schedulerState) resourceOffers(fidStore store.Singleton) events.Han
737750
// Append executor resources to request
738751
executorResources := mesos.Resources(state.executor.Resources)
739752
log.WithPrefix("scheduler").
753+
WithField("partition", envId.String()).
740754
WithField("taskResources", resourcesRequest).
741755
WithField("executorResources", executorResources).
742756
Debug("creating Mesos task")
@@ -746,6 +760,7 @@ func (state *schedulerState) resourceOffers(fidStore store.Singleton) events.Han
746760

747761
executor := state.executor
748762
executor.ExecutorID.Value = taskPtr.GetExecutorId()
763+
envIdS := envId.String()
749764

750765
mesosTaskInfo := mesos.TaskInfo{
751766
Name: taskPtr.GetName(),
@@ -754,6 +769,10 @@ func (state *schedulerState) resourceOffers(fidStore store.Singleton) events.Han
754769
Executor: executor,
755770
Resources: resourcesRequest,
756771
Data: jsonCommand, // this ends up in LAUNCH for the executor
772+
Labels: &mesos.Labels{Labels: []mesos.Label{{
773+
Key: "environmentId",
774+
Value: &envIdS,
775+
}}},
757776
}
758777

759778
// We must run the executor with a special LD_LIBRARY_PATH because
@@ -770,6 +789,7 @@ func (state *schedulerState) resourceOffers(fidStore store.Singleton) events.Han
770789
}
771790

772791
log.WithPrefix("scheduler").
792+
WithField("partition", envId.String()).
773793
WithFields(logrus.Fields{
774794
"taskId": newTaskId,
775795
"offerId": offer.ID.Value,
@@ -1003,11 +1023,13 @@ func (state *schedulerState) tryReviveOffers(ctx context.Context) {
10031023
func doReviveOffers(ctx context.Context, state *schedulerState) {
10041024
err := calls.CallNoData(ctx, state.cli, calls.Revive())
10051025
if err != nil {
1006-
log.WithPrefix("scheduler").WithField("error", err.Error()).
1026+
log.WithPrefix("scheduler").
1027+
WithField("error", err.Error()).
10071028
Error("failed to revive offers")
10081029
return
10091030
}
1010-
log.WithPrefix("scheduler").Debug("revive offers done")
1031+
log.WithPrefix("scheduler").
1032+
Debug("revive offers done")
10111033
}
10121034

10131035
func (state *schedulerState) killTask(ctx context.Context, receiver controlcommands.MesosCommandTarget) (err error) {
@@ -1029,6 +1051,7 @@ func (state *schedulerState) sendCommand(ctx context.Context, command controlcom
10291051
err = calls.CallNoData(ctx, state.cli, message)
10301052

10311053
log.WithPrefix("scheduler").
1054+
WithField("partition", command.GetEnvironmentId().String()).
10321055
WithFields(logrus.Fields{
10331056
"agentId": receiver.AgentId.Value,
10341057
"executorId": receiver.ExecutorId.Value,
@@ -1085,7 +1108,8 @@ func logAllEvents() eventrules.Rule {
10851108
fields["raw"] = fmt.Sprintf("%+v", *e)
10861109
}
10871110
log.WithPrefix("scheduler").
1088-
WithFields(fields).Trace("incoming event")
1111+
WithFields(fields).
1112+
Trace("incoming event")
10891113
return ch(ctx, e, err)
10901114
}
10911115
}

core/task/task.go

Lines changed: 51 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ import (
5656
"github.com/spf13/viper"
5757
)
5858

59-
var log = logger.New(logrus.StandardLogger(),"task")
59+
var log = logger.New(logrus.StandardLogger(), "task")
6060

6161
type parentRole interface {
6262
UpdateStatus(Status)
@@ -75,11 +75,10 @@ type parentRole interface {
7575
SendEvent(event.Event)
7676
}
7777

78-
7978
type Traits struct {
80-
Trigger string
81-
Await string
82-
Timeout string
79+
Trigger string
80+
Await string
81+
Timeout string
8382
Critical bool
8483
}
8584

@@ -105,32 +104,31 @@ type Task interface {
105104
GetMesosCommandTarget() controlcommands.MesosCommandTarget
106105
}*/
107106

108-
109107
type Task struct {
110-
mu sync.RWMutex
111-
parent parentRole
112-
className string
108+
mu sync.RWMutex
109+
parent parentRole
110+
className string
113111
//configuration Descriptor
114-
name string
115-
hostname string
116-
agentId string
117-
offerId string
118-
taskId string
119-
executorId string
112+
name string
113+
hostname string
114+
agentId string
115+
offerId string
116+
taskId string
117+
executorId string
120118

121119
localBindMap channel.BindMap
122120

123-
status Status
124-
state State
125-
safeToStop bool
121+
status Status
122+
state State
123+
safeToStop bool
126124

127-
properties gera.StringMap
125+
properties gera.StringMap
128126

129127
GetTaskClass func() *taskclass.Class
130128
// ↑ to be filled in by NewTaskForMesosOffer in Manager
131129

132-
commandInfo *common.TaskCommandInfo
133-
pid string
130+
commandInfo *common.TaskCommandInfo
131+
pid string
134132
}
135133

136134
func (t *Task) IsSafeToStop() bool {
@@ -169,11 +167,11 @@ func (t *Task) IsLocked() bool {
169167

170168
func (t *Task) isLocked() bool {
171169
return len(t.hostname) > 0 &&
172-
len(t.agentId) > 0 &&
173-
len(t.offerId) > 0 &&
174-
len(t.taskId) > 0 &&
175-
len(t.executorId) > 0 &&
176-
t.parent != nil
170+
len(t.agentId) > 0 &&
171+
len(t.offerId) > 0 &&
172+
len(t.taskId) > 0 &&
173+
len(t.executorId) > 0 &&
174+
t.parent != nil
177175
}
178176

179177
func (t *Task) IsClaimable() bool {
@@ -208,11 +206,11 @@ func (t *Task) GetTaskCommandInfo() *common.TaskCommandInfo {
208206

209207
func (t *Task) buildSpecialVarStack(role parentRole) map[string]string {
210208
varStack := make(map[string]string)
211-
varStack["task_name"] = t.GetName()
212-
varStack["task_id"] = t.GetTaskId()
213-
varStack["task_class_name"] = t.GetClassName()
214-
varStack["task_hostname"] = t.GetHostname()
215-
varStack["environment_id"] = role.GetEnvironmentId().String()
209+
varStack["task_name"] = t.GetName()
210+
varStack["task_id"] = t.GetTaskId()
211+
varStack["task_class_name"] = t.GetClassName()
212+
varStack["task_hostname"] = t.GetHostname()
213+
varStack["environment_id"] = role.GetEnvironmentId().String()
216214
varStack["task_parent_role"] = role.GetPath()
217215
return varStack
218216
}
@@ -269,7 +267,9 @@ func (t *Task) BuildTaskCommand(role parentRole) (err error) {
269267
varStack, err = role.ConsolidatedVarStack()
270268
if err != nil {
271269
t.commandInfo = &common.TaskCommandInfo{}
272-
log.WithError(err).Error("cannot fetch variables stack for task command info")
270+
log.WithError(err).
271+
WithField("partition", role.GetEnvironmentId().String()).
272+
Error("cannot fetch variables stack for task command info")
273273
return
274274
}
275275

@@ -283,7 +283,9 @@ func (t *Task) BuildTaskCommand(role parentRole) (err error) {
283283
err = defaultFields.Execute(the.ConfSvc(), t.name, varStack, nil, make(map[string]texttemplate.Template), nil)
284284
if err != nil {
285285
t.commandInfo = &common.TaskCommandInfo{}
286-
log.WithError(err).Error("cannot resolve templates for task defaults")
286+
log.WithError(err).
287+
WithField("partition", role.GetEnvironmentId().String()).
288+
Error("cannot resolve templates for task defaults")
287289
}
288290

289291
varStack, err = gera.MakeStringMapWithMap(varStack).WrappedAndFlattened(gera.MakeStringMapWithMap(localDefaults))
@@ -296,14 +298,18 @@ func (t *Task) BuildTaskCommand(role parentRole) (err error) {
296298
err = varFields.Execute(the.ConfSvc(), t.name, varStack, nil, make(map[string]texttemplate.Template), nil)
297299
if err != nil {
298300
t.commandInfo = &common.TaskCommandInfo{}
299-
log.WithError(err).Error("cannot resolve templates for task vars")
301+
log.WithError(err).
302+
WithField("partition", role.GetEnvironmentId().String()).
303+
Error("cannot resolve templates for task vars")
300304
}
301305

302306
// We wrap the parent varStack around the task's already processed Defaults,
303307
// ensuring that any taskclass Defaults are overridden by anything else.
304308
varStack, err = gera.MakeStringMapWithMap(varStack).WrappedAndFlattened(gera.MakeStringMapWithMap(localVars))
305309
if err != nil {
306-
log.WithError(err).Error("cannot fetch task class defaults for task command info")
310+
log.WithError(err).
311+
WithField("partition", role.GetEnvironmentId().String()).
312+
Error("cannot fetch task class defaults for task command info")
307313
return
308314
}
309315

@@ -322,16 +328,18 @@ func (t *Task) BuildTaskCommand(role parentRole) (err error) {
322328
},
323329
append(
324330
template.WrapSliceItems(cmd.Env),
325-
template.WrapSliceItems(cmd.Arguments)...
326-
)...
331+
template.WrapSliceItems(cmd.Arguments)...,
332+
)...,
327333
)
328334
if cmd.Log != nil { // we only template it if it's defined
329335
fields = append(fields, template.WrapPointer(cmd.Log))
330336
}
331337
err = fields.Execute(the.ConfSvc(), t.name, varStack, nil, make(map[string]texttemplate.Template), nil)
332338
if err != nil {
333339
t.commandInfo = &common.TaskCommandInfo{}
334-
log.WithError(err).Error("cannot resolve templates for task command info")
340+
log.WithError(err).
341+
WithField("partition", role.GetEnvironmentId().String()).
342+
Error("cannot resolve templates for task command info")
335343
}
336344
}
337345

@@ -454,7 +462,7 @@ func (t *Task) SendEvent(ev event.Event) {
454462
defer t.mu.RUnlock()
455463

456464
if t.parent == nil {
457-
return
465+
return
458466
}
459467
t.parent.SendEvent(ev)
460468
}
@@ -561,7 +569,9 @@ func (t *Task) BuildPropertyMap(bindMap channel.BindMap) (propMap controlcommand
561569

562570
err = fields.Execute(the.ConfSvc(), t.name, varStack, objStack, make(map[string]texttemplate.Template), nil)
563571
if err != nil {
564-
log.WithError(err).Error("cannot resolve templates for property map")
572+
log.WithError(err).
573+
WithField("partition", t.GetParent().GetEnvironmentId().String()).
574+
Error("cannot resolve templates for property map")
565575
return
566576
}
567577

@@ -663,4 +673,4 @@ func (t *Task) GetTask() *Task {
663673
defer t.mu.RUnlock()
664674

665675
return t
666-
}
676+
}

0 commit comments

Comments
 (0)