Skip to content

Commit dc813bd

Browse files
committed
Revert "make sure correct events are emitted"
This reverts commit 5c93032.
1 parent 40c78ab commit dc813bd

4 files changed

Lines changed: 158 additions & 323 deletions

File tree

pkg/runner/test_runner.go

Lines changed: 27 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ import (
1616

1717
"github.com/facebookincubator/contest/pkg/cerrors"
1818
"github.com/facebookincubator/contest/pkg/config"
19-
"github.com/facebookincubator/contest/pkg/event"
2019
"github.com/facebookincubator/contest/pkg/event/testevent"
2120
"github.com/facebookincubator/contest/pkg/logging"
2221
"github.com/facebookincubator/contest/pkg/statectx"
@@ -330,41 +329,32 @@ func (tr *testRunner) waitStepRunners(ctx statectx.Context) error {
330329
tr.cond.Wait()
331330
}
332331
}()
333-
var err error
334332
select {
335333
case <-swch:
336334
tr.log.Debugf("step runners finished")
337335
tr.mu.Lock()
338336
defer tr.mu.Unlock()
339-
err = tr.checkStepRunners()
337+
return tr.checkStepRunners()
340338
case <-time.After(tr.shutdownTimeout):
341339
tr.log.Errorf("step runners failed to shut down correctly")
342-
tr.mu.Lock()
343-
defer tr.mu.Unlock()
344340
// If there is a step with an error set, use that.
345-
err = tr.checkStepRunners()
341+
err := tr.checkStepRunners()
346342
// If there isn't, enumerate ones that were still running at the time.
347343
nrerr := &cerrors.ErrTestStepsNeverReturned{}
348344
if err == nil {
349345
err = nrerr
350346
}
347+
tr.mu.Lock()
348+
defer tr.mu.Unlock()
351349
for _, ss := range tr.steps {
352350
if ss.stepRunning {
353351
nrerr.StepNames = append(nrerr.StepNames, ss.sb.TestStepLabel)
354352
// We cannot make the step itself return but we can at least release the reader.
355353
tr.safeCloseOutCh(ss)
356354
}
357355
}
356+
return err
358357
}
359-
// Emit step error events.
360-
for _, ss := range tr.steps {
361-
if ss.runErr != nil && ss.runErr != statectx.ErrPaused && ss.runErr != statectx.ErrCanceled {
362-
if err := ss.emitEvent(EventTestError, nil, ss.runErr.Error()); err != nil {
363-
tr.log.Errorf("failed to emit event: %s", err)
364-
}
365-
}
366-
}
367-
return err
368358
}
369359

370360
// targetRunner runs one target through all the steps of the pipeline.
@@ -403,28 +393,16 @@ loop:
403393
select {
404394
case ss.inCh <- ts.tgt:
405395
// Injected successfully.
406-
err := ss.ev.Emit(testevent.Data{EventName: target.EventTargetIn, Target: ts.tgt})
407396
tr.mu.Lock()
408397
ts.CurPhase = targetStepPhaseRun
409398
ss.numInjected++
410-
if err != nil {
411-
ss.runErr = fmt.Errorf("failed to report target injection: %w", err)
412-
ss.log.Errorf("%s", ss.runErr)
413-
}
414399
tr.mu.Unlock()
415400
tr.cond.Signal()
416-
if err != nil {
417-
break loop
418-
}
419401
case <-time.After(tr.stepInjectTimeout):
420402
tr.mu.Lock()
421403
ss.log.Errorf("timed out while injecting a target")
422404
ss.runErr = &cerrors.ErrTestTargetInjectionTimedOut{StepName: ss.sb.TestStepLabel}
423405
tr.mu.Unlock()
424-
err := ss.ev.Emit(testevent.Data{EventName: target.EventTargetInErr, Target: ts.tgt})
425-
if err != nil {
426-
ss.log.Errorf("failed to emit event: %s", err)
427-
}
428406
break loop
429407
case <-ctx.Done():
430408
log.Debugf("%s: canceled 1", ts)
@@ -438,15 +416,6 @@ loop:
438416
break loop
439417
}
440418
log.Debugf("%s: result for %s recd", ts, ss)
441-
var err error
442-
if res == nil {
443-
err = ss.emitEvent(target.EventTargetOut, ts.tgt, nil)
444-
} else {
445-
err = ss.emitEvent(target.EventTargetErr, ts.tgt, target.ErrPayload{Error: res.Error()})
446-
}
447-
if err != nil {
448-
ss.log.Errorf("failed to emit event: %s", err)
449-
}
450419
tr.mu.Lock()
451420
ts.CurPhase = targetStepPhaseEnd
452421
ts.res = res
@@ -495,21 +464,20 @@ func (tr *testRunner) runStepIfNeeded(ctx statectx.Context, ss *stepState) {
495464
go tr.stepReader(ctx, ss)
496465
}
497466

498-
// emitEvent emits the specified event with the specified JSON payload (if any).
499-
func (ss *stepState) emitEvent(name event.Name, tgt *target.Target, payload interface{}) error {
500-
var payloadJSON *json.RawMessage
501-
if payload != nil {
502-
payloadBytes, jmErr := json.Marshal(payload)
503-
if jmErr != nil {
504-
return fmt.Errorf("failed to marshal event: %w", jmErr)
505-
}
506-
pj := json.RawMessage(payloadBytes)
507-
payloadJSON = &pj
467+
// emitStepEvent emits an error event if step resulted in an error.
468+
func (ss *stepState) emitStepEvent(tgt *target.Target, err error) error {
469+
if err == nil {
470+
return nil
508471
}
472+
payload, jmErr := json.Marshal(err.Error())
473+
if jmErr != nil {
474+
return fmt.Errorf("failed to marshal event: %w", err)
475+
}
476+
rm := json.RawMessage(payload)
509477
errEv := testevent.Data{
510-
EventName: name,
478+
EventName: EventTestError,
511479
Target: tgt,
512-
Payload: payloadJSON,
480+
Payload: &rm,
513481
}
514482
return ss.ev.Emit(errEv)
515483
}
@@ -523,14 +491,19 @@ func (tr *testRunner) stepRunner(ctx statectx.Context, ss *stepState) {
523491
ss.stepRunning = false
524492
ss.runErr = &cerrors.ErrTestStepPaniced{
525493
StepName: ss.sb.TestStepLabel,
526-
StackTrace: fmt.Sprintf("%s / %s", r, debug.Stack()),
494+
StackTrace: string(debug.Stack()),
527495
}
528496
tr.mu.Unlock()
529497
tr.safeCloseOutCh(ss)
530498
}
531499
}()
532500
chans := test.TestStepChannels{In: ss.inCh, Out: ss.outCh, Err: ss.errCh}
533501
runErr := ss.sb.TestStep.Run(ctx, chans, ss.sb.Parameters, ss.ev)
502+
if err := ss.emitStepEvent(nil, runErr); err != nil {
503+
if ss.runErr == nil {
504+
ss.runErr = err
505+
}
506+
}
534507
tr.mu.Lock()
535508
ss.stepRunning = false
536509
if runErr != nil {
@@ -575,6 +548,11 @@ func (tr *testRunner) reportTargetResult(ctx statectx.Context, ss *stepState, tg
575548
if err != nil {
576549
return err
577550
}
551+
if res != nil {
552+
if err := ss.emitStepEvent(tgt, res); err != nil {
553+
return err
554+
}
555+
}
578556
select {
579557
case resCh <- res:
580558
break

0 commit comments

Comments
 (0)