@@ -16,7 +16,6 @@ import (
1616
1717 "github.com/facebookincubator/contest/pkg/cerrors"
1818 "github.com/facebookincubator/contest/pkg/config"
19- "github.com/facebookincubator/contest/pkg/event"
2019 "github.com/facebookincubator/contest/pkg/event/testevent"
2120 "github.com/facebookincubator/contest/pkg/logging"
2221 "github.com/facebookincubator/contest/pkg/statectx"
@@ -330,41 +329,32 @@ func (tr *testRunner) waitStepRunners(ctx statectx.Context) error {
330329 tr .cond .Wait ()
331330 }
332331 }()
333- var err error
334332 select {
335333 case <- swch :
336334 tr .log .Debugf ("step runners finished" )
337335 tr .mu .Lock ()
338336 defer tr .mu .Unlock ()
339- err = tr .checkStepRunners ()
337+ return tr .checkStepRunners ()
340338 case <- time .After (tr .shutdownTimeout ):
341339 tr .log .Errorf ("step runners failed to shut down correctly" )
342- tr .mu .Lock ()
343- defer tr .mu .Unlock ()
344340 // If there is a step with an error set, use that.
345- err = tr .checkStepRunners ()
341+ err : = tr .checkStepRunners ()
346342 // If there isn't, enumerate ones that were still running at the time.
347343 nrerr := & cerrors.ErrTestStepsNeverReturned {}
348344 if err == nil {
349345 err = nrerr
350346 }
347+ tr .mu .Lock ()
348+ defer tr .mu .Unlock ()
351349 for _ , ss := range tr .steps {
352350 if ss .stepRunning {
353351 nrerr .StepNames = append (nrerr .StepNames , ss .sb .TestStepLabel )
354352 // We cannot make the step itself return but we can at least release the reader.
355353 tr .safeCloseOutCh (ss )
356354 }
357355 }
356+ return err
358357 }
359- // Emit step error events.
360- for _ , ss := range tr .steps {
361- if ss .runErr != nil && ss .runErr != statectx .ErrPaused && ss .runErr != statectx .ErrCanceled {
362- if err := ss .emitEvent (EventTestError , nil , ss .runErr .Error ()); err != nil {
363- tr .log .Errorf ("failed to emit event: %s" , err )
364- }
365- }
366- }
367- return err
368358}
369359
370360// targetRunner runs one target through all the steps of the pipeline.
@@ -403,28 +393,16 @@ loop:
403393 select {
404394 case ss .inCh <- ts .tgt :
405395 // Injected successfully.
406- err := ss .ev .Emit (testevent.Data {EventName : target .EventTargetIn , Target : ts .tgt })
407396 tr .mu .Lock ()
408397 ts .CurPhase = targetStepPhaseRun
409398 ss .numInjected ++
410- if err != nil {
411- ss .runErr = fmt .Errorf ("failed to report target injection: %w" , err )
412- ss .log .Errorf ("%s" , ss .runErr )
413- }
414399 tr .mu .Unlock ()
415400 tr .cond .Signal ()
416- if err != nil {
417- break loop
418- }
419401 case <- time .After (tr .stepInjectTimeout ):
420402 tr .mu .Lock ()
421403 ss .log .Errorf ("timed out while injecting a target" )
422404 ss .runErr = & cerrors.ErrTestTargetInjectionTimedOut {StepName : ss .sb .TestStepLabel }
423405 tr .mu .Unlock ()
424- err := ss .ev .Emit (testevent.Data {EventName : target .EventTargetInErr , Target : ts .tgt })
425- if err != nil {
426- ss .log .Errorf ("failed to emit event: %s" , err )
427- }
428406 break loop
429407 case <- ctx .Done ():
430408 log .Debugf ("%s: canceled 1" , ts )
@@ -438,15 +416,6 @@ loop:
438416 break loop
439417 }
440418 log .Debugf ("%s: result for %s recd" , ts , ss )
441- var err error
442- if res == nil {
443- err = ss .emitEvent (target .EventTargetOut , ts .tgt , nil )
444- } else {
445- err = ss .emitEvent (target .EventTargetErr , ts .tgt , target.ErrPayload {Error : res .Error ()})
446- }
447- if err != nil {
448- ss .log .Errorf ("failed to emit event: %s" , err )
449- }
450419 tr .mu .Lock ()
451420 ts .CurPhase = targetStepPhaseEnd
452421 ts .res = res
@@ -495,21 +464,20 @@ func (tr *testRunner) runStepIfNeeded(ctx statectx.Context, ss *stepState) {
495464 go tr .stepReader (ctx , ss )
496465}
497466
498- // emitEvent emits the specified event with the specified JSON payload (if any).
499- func (ss * stepState ) emitEvent (name event.Name , tgt * target.Target , payload interface {}) error {
500- var payloadJSON * json.RawMessage
501- if payload != nil {
502- payloadBytes , jmErr := json .Marshal (payload )
503- if jmErr != nil {
504- return fmt .Errorf ("failed to marshal event: %w" , jmErr )
505- }
506- pj := json .RawMessage (payloadBytes )
507- payloadJSON = & pj
467+ // emitStepEvent emits an error event if step resulted in an error.
468+ func (ss * stepState ) emitStepEvent (tgt * target.Target , err error ) error {
469+ if err == nil {
470+ return nil
508471 }
472+ payload , jmErr := json .Marshal (err .Error ())
473+ if jmErr != nil {
474+ return fmt .Errorf ("failed to marshal event: %w" , err )
475+ }
476+ rm := json .RawMessage (payload )
509477 errEv := testevent.Data {
510- EventName : name ,
478+ EventName : EventTestError ,
511479 Target : tgt ,
512- Payload : payloadJSON ,
480+ Payload : & rm ,
513481 }
514482 return ss .ev .Emit (errEv )
515483}
@@ -523,14 +491,19 @@ func (tr *testRunner) stepRunner(ctx statectx.Context, ss *stepState) {
523491 ss .stepRunning = false
524492 ss .runErr = & cerrors.ErrTestStepPaniced {
525493 StepName : ss .sb .TestStepLabel ,
526- StackTrace : fmt . Sprintf ( "%s / %s" , r , debug .Stack ()),
494+ StackTrace : string ( debug .Stack ()),
527495 }
528496 tr .mu .Unlock ()
529497 tr .safeCloseOutCh (ss )
530498 }
531499 }()
532500 chans := test.TestStepChannels {In : ss .inCh , Out : ss .outCh , Err : ss .errCh }
533501 runErr := ss .sb .TestStep .Run (ctx , chans , ss .sb .Parameters , ss .ev )
502+ if err := ss .emitStepEvent (nil , runErr ); err != nil {
503+ if ss .runErr == nil {
504+ ss .runErr = err
505+ }
506+ }
534507 tr .mu .Lock ()
535508 ss .stepRunning = false
536509 if runErr != nil {
@@ -575,6 +548,11 @@ func (tr *testRunner) reportTargetResult(ctx statectx.Context, ss *stepState, tg
575548 if err != nil {
576549 return err
577550 }
551+ if res != nil {
552+ if err := ss .emitStepEvent (tgt , res ); err != nil {
553+ return err
554+ }
555+ }
578556 select {
579557 case resCh <- res :
580558 break
0 commit comments