Skip to content

Commit 886c99a

Browse files
committed
[core] Use connectivity.Shutdown for plugin client connection check
1 parent 71db408 commit 886c99a

3 files changed

Lines changed: 143 additions & 140 deletions

File tree

core/integration/dcs/plugin.go

Lines changed: 33 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ import (
4646
"github.com/imdario/mergo"
4747
"github.com/spf13/viper"
4848
"google.golang.org/grpc"
49+
"google.golang.org/grpc/connectivity"
4950
)
5051

5152
const (
@@ -339,22 +340,22 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
339340
return
340341
}
341342

342-
//if p.dcsClient.GetConnState() != connectivity.Ready {
343-
// err = fmt.Errorf("DCS client connection not available, StartOfRun impossible")
344-
//
345-
// log.WithError(err).
346-
// WithField("level", infologger.IL_Support).
347-
// WithField("endpoint", viper.GetString("dcsServiceEndpoint")).
348-
// WithField("runNumber", runNumber64).
349-
// WithField("partition", envId).
350-
// WithField("call", "StartOfRun").
351-
// Error("DCS error")
352-
//
353-
// call.VarStack["__call_error_reason"] = err.Error()
354-
// call.VarStack["__call_error"] = callFailedStr
355-
//
356-
// return
357-
//}
343+
if p.dcsClient.GetConnState() == connectivity.Shutdown {
344+
err = fmt.Errorf("DCS client connection not available, StartOfRun impossible")
345+
346+
log.WithError(err).
347+
WithField("level", infologger.IL_Support).
348+
WithField("endpoint", viper.GetString("dcsServiceEndpoint")).
349+
WithField("runNumber", runNumber64).
350+
WithField("partition", envId).
351+
WithField("call", "StartOfRun").
352+
Error("DCS error")
353+
354+
call.VarStack["__call_error_reason"] = err.Error()
355+
call.VarStack["__call_error"] = callFailedStr
356+
357+
return
358+
}
358359

359360
var stream dcspb.Configurator_StartOfRunClient
360361
timeout := callable.AcquireTimeout(DCS_GENERAL_OP_TIMEOUT, varStack, "SOR", envId)
@@ -636,22 +637,22 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
636637

637638
return
638639
}
639-
//if p.dcsClient.GetConnState() != connectivity.Ready {
640-
// err = fmt.Errorf("DCS client connection not available, EndOfRun impossible")
641-
//
642-
// log.WithError(err).
643-
// WithField("level", infologger.IL_Support).
644-
// WithField("endpoint", viper.GetString("dcsServiceEndpoint")).
645-
// WithField("runNumber", runNumber64).
646-
// WithField("partition", envId).
647-
// WithField("call", "EndOfRun").
648-
// Error("DCS error")
649-
//
650-
// call.VarStack["__call_error_reason"] = err.Error()
651-
// call.VarStack["__call_error"] = callFailedStr
652-
//
653-
// return
654-
//}
640+
if p.dcsClient.GetConnState() == connectivity.Shutdown {
641+
err = fmt.Errorf("DCS client connection not available, EndOfRun impossible")
642+
643+
log.WithError(err).
644+
WithField("level", infologger.IL_Support).
645+
WithField("endpoint", viper.GetString("dcsServiceEndpoint")).
646+
WithField("runNumber", runNumber64).
647+
WithField("partition", envId).
648+
WithField("call", "EndOfRun").
649+
Error("DCS error")
650+
651+
call.VarStack["__call_error_reason"] = err.Error()
652+
call.VarStack["__call_error"] = callFailedStr
653+
654+
return
655+
}
655656

656657
var stream dcspb.Configurator_EndOfRunClient
657658
timeout := callable.AcquireTimeout(DCS_GENERAL_OP_TIMEOUT, varStack, "EOR", envId)

core/integration/ddsched/plugin.go

Lines changed: 45 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ import (
4444
"github.com/AliceO2Group/Control/core/workflow/callable"
4545
"github.com/spf13/viper"
4646
"google.golang.org/grpc"
47+
"google.golang.org/grpc/connectivity"
4748
)
4849

4950
const (
@@ -256,20 +257,20 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
256257
return
257258
}
258259

259-
//if p.ddSchedClient.GetConnState() != connectivity.Ready {
260-
// err = fmt.Errorf("DD scheduler client connection not available, PartitionInitialize impossible")
261-
//
262-
// log.WithError(err).
263-
// WithField("level", infologger.IL_Support).
264-
// WithField("partition", envId).
265-
// WithField("call", "PartitionInitialize").
266-
// Error("DDsched error")
267-
//
268-
// call.VarStack["__call_error_reason"] = err.Error()
269-
// call.VarStack["__call_error"] = callFailedStr
270-
//
271-
// return
272-
//}
260+
if p.ddSchedClient.GetConnState() == connectivity.Shutdown {
261+
err = fmt.Errorf("DD scheduler client connection not available, PartitionInitialize impossible")
262+
263+
log.WithError(err).
264+
WithField("level", infologger.IL_Support).
265+
WithField("partition", envId).
266+
WithField("call", "PartitionInitialize").
267+
Error("DDsched error")
268+
269+
call.VarStack["__call_error_reason"] = err.Error()
270+
call.VarStack["__call_error"] = callFailedStr
271+
272+
return
273+
}
273274

274275
var (
275276
response *ddpb.PartitionResponse
@@ -393,21 +394,21 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
393394

394395
return
395396
}
396-
//if p.ddSchedClient.GetConnState() != connectivity.Ready {
397-
// err = fmt.Errorf("DD scheduler client connection not available, PartitionTerminate impossible")
398-
//
399-
// log.WithError(err).
400-
// WithField("level", infologger.IL_Support).
401-
// WithField("partition", envId).
402-
// WithField("endpoint", viper.GetString("ddSchedulerEndpoint")).
403-
// WithField("call", "PartitionTerminate").
404-
// Error("DDsched error")
405-
//
406-
// call.VarStack["__call_error_reason"] = err.Error()
407-
// call.VarStack["__call_error"] = callFailedStr
408-
//
409-
// return
410-
//}
397+
if p.ddSchedClient.GetConnState() == connectivity.Shutdown {
398+
err = fmt.Errorf("DD scheduler client connection not available, PartitionTerminate impossible")
399+
400+
log.WithError(err).
401+
WithField("level", infologger.IL_Support).
402+
WithField("partition", envId).
403+
WithField("endpoint", viper.GetString("ddSchedulerEndpoint")).
404+
WithField("call", "PartitionTerminate").
405+
Error("DDsched error")
406+
407+
call.VarStack["__call_error_reason"] = err.Error()
408+
call.VarStack["__call_error"] = callFailedStr
409+
410+
return
411+
}
411412

412413
var (
413414
response *ddpb.PartitionResponse
@@ -525,21 +526,21 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
525526

526527
return
527528
}
528-
//if p.ddSchedClient.GetConnState() != connectivity.Ready {
529-
// err = fmt.Errorf("DD scheduler client connection not available, EnsureTermination impossible")
530-
//
531-
// log.WithError(err).
532-
// WithField("level", infologger.IL_Support).
533-
// WithField("partition", envId).
534-
// WithField("endpoint", viper.GetString("ddSchedulerEndpoint")).
535-
// WithField("call", "EnsureTermination").
536-
// Error("DDsched error")
537-
//
538-
// call.VarStack["__call_error_reason"] = err.Error()
539-
// call.VarStack["__call_error"] = callFailedStr
540-
//
541-
// return
542-
//}
529+
if p.ddSchedClient.GetConnState() == connectivity.Shutdown {
530+
err = fmt.Errorf("DD scheduler client connection not available, EnsureTermination impossible")
531+
532+
log.WithError(err).
533+
WithField("level", infologger.IL_Support).
534+
WithField("partition", envId).
535+
WithField("endpoint", viper.GetString("ddSchedulerEndpoint")).
536+
WithField("call", "EnsureTermination").
537+
Error("DDsched error")
538+
539+
call.VarStack["__call_error_reason"] = err.Error()
540+
call.VarStack["__call_error"] = callFailedStr
541+
542+
return
543+
}
543544

544545
var (
545546
response *ddpb.PartitionResponse

core/integration/trg/plugin.go

Lines changed: 65 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ import (
4545
"github.com/AliceO2Group/Control/core/workflow/callable"
4646
"github.com/spf13/viper"
4747
"google.golang.org/grpc"
48+
"google.golang.org/grpc/connectivity"
4849
)
4950

5051
const (
@@ -409,22 +410,22 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
409410
return
410411
}
411412

412-
//if p.trgClient.GetConnState() != connectivity.Ready {
413-
// err = fmt.Errorf("TRG client connection not available, RunLoad impossible")
414-
//
415-
// log.WithError(err).
416-
// WithField("level", infologger.IL_Support).
417-
// WithField("endpoint", viper.GetString("trgServiceEndpoint")).
418-
// WithField("runNumber", runNumber64).
419-
// WithField("partition", envId).
420-
// WithField("call", "RunLoad").
421-
// Error("TRG error")
422-
//
423-
// call.VarStack["__call_error_reason"] = err.Error()
424-
// call.VarStack["__call_error"] = callFailedStr
425-
//
426-
// return
427-
//}
413+
if p.trgClient.GetConnState() == connectivity.Shutdown {
414+
err = fmt.Errorf("TRG client connection not available, RunLoad impossible")
415+
416+
log.WithError(err).
417+
WithField("level", infologger.IL_Support).
418+
WithField("endpoint", viper.GetString("trgServiceEndpoint")).
419+
WithField("runNumber", runNumber64).
420+
WithField("partition", envId).
421+
WithField("call", "RunLoad").
422+
Error("TRG error")
423+
424+
call.VarStack["__call_error_reason"] = err.Error()
425+
call.VarStack["__call_error"] = callFailedStr
426+
427+
return
428+
}
428429

429430
var response *trgpb.RunReply
430431
response, err = p.trgClient.RunLoad(context.Background(), &in, grpc.EmptyCallOption{})
@@ -545,22 +546,22 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
545546

546547
return
547548
}
548-
//if p.trgClient.GetConnState() != connectivity.Ready {
549-
// err = fmt.Errorf("TRG client connection not available, RunStart impossible")
550-
//
551-
// log.WithError(err).
552-
// WithField("level", infologger.IL_Support).
553-
// WithField("endpoint", viper.GetString("trgServiceEndpoint")).
554-
// WithField("runNumber", runNumber64).
555-
// WithField("partition", envId).
556-
// WithField("call", "RunStart").
557-
// Error("TRG error")
558-
//
559-
// call.VarStack["__call_error_reason"] = err.Error()
560-
// call.VarStack["__call_error"] = callFailedStr
561-
//
562-
// return
563-
//}
549+
if p.trgClient.GetConnState() == connectivity.Shutdown {
550+
err = fmt.Errorf("TRG client connection not available, RunStart impossible")
551+
552+
log.WithError(err).
553+
WithField("level", infologger.IL_Support).
554+
WithField("endpoint", viper.GetString("trgServiceEndpoint")).
555+
WithField("runNumber", runNumber64).
556+
WithField("partition", envId).
557+
WithField("call", "RunStart").
558+
Error("TRG error")
559+
560+
call.VarStack["__call_error_reason"] = err.Error()
561+
call.VarStack["__call_error"] = callFailedStr
562+
563+
return
564+
}
564565

565566
var response *trgpb.RunReply
566567

@@ -657,22 +658,22 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
657658

658659
return
659660
}
660-
//if p.trgClient.GetConnState() != connectivity.Ready {
661-
// err = fmt.Errorf("TRG client connection not available, RunStop impossible")
662-
//
663-
// log.WithError(err).
664-
// WithField("level", infologger.IL_Support).
665-
// WithField("endpoint", viper.GetString("trgServiceEndpoint")).
666-
// WithField("runNumber", runNumber64).
667-
// WithField("partition", envId).
668-
// WithField("call", "RunStop").
669-
// Error("TRG error")
670-
//
671-
// call.VarStack["__call_error_reason"] = err.Error()
672-
// call.VarStack["__call_error"] = callFailedStr
673-
//
674-
// return
675-
//}
661+
if p.trgClient.GetConnState() == connectivity.Shutdown {
662+
err = fmt.Errorf("TRG client connection not available, RunStop impossible")
663+
664+
log.WithError(err).
665+
WithField("level", infologger.IL_Support).
666+
WithField("endpoint", viper.GetString("trgServiceEndpoint")).
667+
WithField("runNumber", runNumber64).
668+
WithField("partition", envId).
669+
WithField("call", "RunStop").
670+
Error("TRG error")
671+
672+
call.VarStack["__call_error_reason"] = err.Error()
673+
call.VarStack["__call_error"] = callFailedStr
674+
675+
return
676+
}
676677

677678
var response *trgpb.RunReply
678679
response, err = p.trgClient.RunStop(context.Background(), &in, grpc.EmptyCallOption{})
@@ -772,22 +773,22 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
772773

773774
return
774775
}
775-
//if p.trgClient.GetConnState() != connectivity.Ready {
776-
// err = fmt.Errorf("TRG client connection not available, RunUnload impossible")
777-
//
778-
// log.WithError(err).
779-
// WithField("level", infologger.IL_Support).
780-
// WithField("endpoint", viper.GetString("trgServiceEndpoint")).
781-
// WithField("runNumber", runNumber64).
782-
// WithField("partition", envId).
783-
// WithField("call", "RunUnload").
784-
// Error("TRG error")
785-
//
786-
// call.VarStack["__call_error_reason"] = err.Error()
787-
// call.VarStack["__call_error"] = callFailedStr
788-
//
789-
// return
790-
//}
776+
if p.trgClient.GetConnState() == connectivity.Shutdown {
777+
err = fmt.Errorf("TRG client connection not available, RunUnload impossible")
778+
779+
log.WithError(err).
780+
WithField("level", infologger.IL_Support).
781+
WithField("endpoint", viper.GetString("trgServiceEndpoint")).
782+
WithField("runNumber", runNumber64).
783+
WithField("partition", envId).
784+
WithField("call", "RunUnload").
785+
Error("TRG error")
786+
787+
call.VarStack["__call_error_reason"] = err.Error()
788+
call.VarStack["__call_error"] = callFailedStr
789+
790+
return
791+
}
791792

792793
var response *trgpb.RunReply
793794
response, err = p.trgClient.RunUnload(context.Background(), &in, grpc.EmptyCallOption{})

0 commit comments

Comments
 (0)