@@ -65,9 +65,9 @@ export class JobOutputSchemaValidationError extends schema.SchemaValidationExcep
6565interface JobHandlerWithExtra extends JobHandler < JsonValue , JsonValue , JsonValue > {
6666 jobDescription : JobDescription ;
6767
68- argumentV : Observable < schema . SchemaValidator > ;
69- outputV : Observable < schema . SchemaValidator > ;
70- inputV : Observable < schema . SchemaValidator > ;
68+ argumentV : Promise < schema . SchemaValidator > ;
69+ outputV : Promise < schema . SchemaValidator > ;
70+ inputV : Promise < schema . SchemaValidator > ;
7171}
7272
7373function _jobShare < T > ( ) : MonoTypeOperatorFunction < T > {
@@ -159,9 +159,9 @@ export class SimpleScheduler<
159159
160160 const handlerWithExtra = Object . assign ( handler . bind ( undefined ) , {
161161 jobDescription : description ,
162- argumentV : this . _schemaRegistry . compile ( description . argument ) . pipe ( shareReplay ( 1 ) ) ,
163- inputV : this . _schemaRegistry . compile ( description . input ) . pipe ( shareReplay ( 1 ) ) ,
164- outputV : this . _schemaRegistry . compile ( description . output ) . pipe ( shareReplay ( 1 ) ) ,
162+ argumentV : this . _schemaRegistry . compile ( description . argument ) ,
163+ inputV : this . _schemaRegistry . compile ( description . input ) ,
164+ outputV : this . _schemaRegistry . compile ( description . output ) ,
165165 } ) as JobHandlerWithExtra ;
166166 this . _internalJobDescriptionMap . set ( name , handlerWithExtra ) ;
167167
@@ -284,6 +284,7 @@ export class SimpleScheduler<
284284 * Create the job.
285285 * @private
286286 */
287+ // eslint-disable-next-line max-lines-per-function
287288 private _createJob < A extends MinimumArgumentT , I extends MinimumInputT , O extends MinimumOutputT > (
288289 name : JobName ,
289290 argument : A ,
@@ -305,12 +306,14 @@ export class SimpleScheduler<
305306 . pipe (
306307 concatMap ( ( message ) =>
307308 handler . pipe (
308- switchMap ( ( handler ) => {
309+ switchMap ( async ( handler ) => {
309310 if ( handler === null ) {
310311 throw new JobDoesNotExistException ( name ) ;
311- } else {
312- return handler . inputV . pipe ( switchMap ( ( validate ) => validate ( message ) ) ) ;
313312 }
313+
314+ const validator = await handler . inputV ;
315+
316+ return validator ( message ) ;
314317 } ) ,
315318 ) ,
316319 ) ,
@@ -395,24 +398,20 @@ export class SimpleScheduler<
395398 }
396399
397400 return handler . pipe (
398- switchMap ( ( handler ) => {
401+ switchMap ( async ( handler ) => {
399402 if ( handler === null ) {
400403 throw new JobDoesNotExistException ( name ) ;
401- } else {
402- return handler . outputV . pipe (
403- switchMap ( ( validate ) => validate ( message . value ) ) ,
404- switchMap ( ( output ) => {
405- if ( ! output . success ) {
406- throw new JobOutputSchemaValidationError ( output . errors ) ;
407- }
408-
409- return of ( {
410- ...message ,
411- output : output . data as O ,
412- } as JobOutboundMessageOutput < O > ) ;
413- } ) ,
414- ) ;
415404 }
405+ const validate = await handler . outputV ;
406+ const output = await validate ( message . value ) ;
407+ if ( ! output . success ) {
408+ throw new JobOutputSchemaValidationError ( output . errors ) ;
409+ }
410+
411+ return {
412+ ...message ,
413+ output : output . data as O ,
414+ } as JobOutboundMessageOutput < O > ;
416415 } ) ,
417416 ) as Observable < JobOutboundMessage < O > > ;
418417 } ) ,
@@ -457,7 +456,7 @@ export class SimpleScheduler<
457456 return maybeObservable . pipe (
458457 // Keep the order of messages.
459458 concatMap ( ( message ) => {
460- return schemaRegistry . compile ( schema ) . pipe (
459+ return from ( schemaRegistry . compile ( schema ) ) . pipe (
461460 switchMap ( ( validate ) => validate ( message ) ) ,
462461 filter ( ( x ) => x . success ) ,
463462 map ( ( x ) => x . data as T ) ,
@@ -518,7 +517,7 @@ export class SimpleScheduler<
518517 }
519518
520519 // Validate the argument.
521- return handler . argumentV
520+ return from ( handler . argumentV )
522521 . pipe (
523522 switchMap ( ( validate ) => validate ( argument ) ) ,
524523 switchMap ( ( output ) => {
0 commit comments