@@ -13,6 +13,7 @@ import { UnsuccessfulWorkflowExecution } from '../exception/exception';
1313import { standardFormats } from '../formats' ;
1414import { DryRunEvent , DryRunSink } from '../sink/dryrun' ;
1515import { HostSink } from '../sink/host' ;
16+ import { Sink } from '../sink/sink' ;
1617import { HostTree } from '../tree/host-tree' ;
1718import { Tree } from '../tree/interface' ;
1819import { optimize } from '../tree/static' ;
@@ -96,6 +97,35 @@ export abstract class BaseWorkflow implements Workflow {
9697 return this . _lifeCycle . asObservable ( ) ;
9798 }
9899
100+ protected _createSinks ( ) : Sink [ ] {
101+ let error = false ;
102+
103+ const dryRunSink = new DryRunSink ( this . _host , this . _force ) ;
104+ const dryRunSubscriber = dryRunSink . reporter . subscribe ( event => {
105+ this . _reporter . next ( event ) ;
106+ error = error || ( event . kind == 'error' ) ;
107+ } ) ;
108+
109+ // We need two sinks if we want to output what will happen, and actually do the work.
110+ return [
111+ dryRunSink ,
112+ // Add a custom sink that clean ourselves and throws an error if an error happened.
113+ {
114+ commit ( ) {
115+ dryRunSubscriber . unsubscribe ( ) ;
116+ if ( error ) {
117+ return throwError ( new UnsuccessfulWorkflowExecution ( ) ) ;
118+ }
119+
120+ return of ( ) ;
121+ } ,
122+ } ,
123+
124+ // Only add a HostSink if this is not a dryRun.
125+ ...( ! this . _dryRun ? [ new HostSink ( this . _host , this . _force ) ] : [ ] ) ,
126+ ] ;
127+ }
128+
99129 execute (
100130 options : Partial < WorkflowExecutionContext > & RequiredWorkflowExecutionContext ,
101131 ) : Observable < void > {
@@ -112,17 +142,7 @@ export abstract class BaseWorkflow implements Workflow {
112142 || ( parentContext && parentContext . collection === options . collection ) ;
113143 const schematic = collection . createSchematic ( options . schematic , allowPrivate ) ;
114144
115- // We need two sinks if we want to output what will happen, and actually do the work.
116- // Note that fsSink is technically not used if `--dry-run` is passed, but creating the Sink
117- // does not have any side effect.
118- const dryRunSink = new DryRunSink ( this . _host , this . _force ) ;
119- const fsSink = new HostSink ( this . _host , this . _force ) ;
120-
121- let error = false ;
122- const dryRunSubscriber = dryRunSink . reporter . subscribe ( event => {
123- this . _reporter . next ( event ) ;
124- error = error || ( event . kind == 'error' ) ;
125- } ) ;
145+ const sinks = this . _createSinks ( ) ;
126146
127147 this . _lifeCycle . next ( { kind : 'workflow-start' } ) ;
128148
@@ -141,23 +161,18 @@ export abstract class BaseWorkflow implements Workflow {
141161 ) . pipe (
142162 map ( tree => optimize ( tree ) ) ,
143163 concatMap ( ( tree : Tree ) => {
144- return concat (
145- dryRunSink . commit ( tree ) . pipe ( ignoreElements ( ) ) ,
146- of ( tree ) ,
164+ // Process all sinks.
165+ return of ( tree ) . pipe (
166+ ...sinks . map ( sink => {
167+ return concatMap ( ( tree : Tree ) => {
168+ return concat (
169+ sink . commit ( tree ) . pipe ( ignoreElements ( ) ) ,
170+ of ( tree ) ,
171+ ) ;
172+ } ) ;
173+ } ) ,
147174 ) ;
148175 } ) ,
149- concatMap ( ( tree : Tree ) => {
150- dryRunSubscriber . unsubscribe ( ) ;
151- if ( error ) {
152- return throwError ( new UnsuccessfulWorkflowExecution ( ) ) ;
153- }
154-
155- if ( this . _dryRun ) {
156- return of ( ) ;
157- }
158-
159- return fsSink . commit ( tree ) . pipe ( defaultIfEmpty ( ) , last ( ) ) ;
160- } ) ,
161176 concatMap ( ( ) => {
162177 if ( this . _dryRun ) {
163178 return of ( ) ;
0 commit comments