2323import feast .core .model .*;
2424import feast .proto .core .FeatureSetProto ;
2525import java .time .Instant ;
26+ import java .util .HashSet ;
2627import java .util .List ;
2728import java .util .Map ;
29+ import java .util .Objects ;
2830import java .util .Optional ;
2931import java .util .concurrent .Callable ;
3032import java .util .concurrent .ExecutionException ;
3638import java .util .stream .Collectors ;
3739import lombok .Getter ;
3840import lombok .extern .slf4j .Slf4j ;
39- import org .apache .beam .vendor .guava .v26_0_jre .com .google .common .hash .Hashing ;
4041
4142/**
4243 * JobUpdateTask is a callable that starts or updates a job given a set of featureSetSpecs, as well
43- * as their source and sink.
44+ * their source and sink to transition to targetStatus .
4445 *
4546 * <p>When complete, the JobUpdateTask returns the updated Job object to be pushed to the db.
4647 */
4748@ Slf4j
4849@ Getter
4950public class JobUpdateTask implements Callable <Job > {
5051
52+ /**
53+ * JobTargetStatus enum defines the possible target statuses that JobUpdateTask can transition a
54+ * Job to.
55+ */
56+ public enum JobTargetStatus {
57+ RUNNING ,
58+ ABORTED
59+ }
60+
5161 private final List <FeatureSet > featureSets ;
5262 private final Source source ;
5363 private final Store store ;
64+ private final JobTargetStatus targetStatus ;
5465 private final Optional <Job > currentJob ;
5566 private final JobManager jobManager ;
5667 private final long jobUpdateTimeoutSeconds ;
@@ -62,38 +73,44 @@ public JobUpdateTask(
6273 Store store ,
6374 Optional <Job > currentJob ,
6475 JobManager jobManager ,
65- long jobUpdateTimeoutSeconds ) {
66-
76+ long jobUpdateTimeoutSeconds ,
77+ JobTargetStatus targetStatus ) {
6778 this .featureSets = featureSets ;
6879 this .source = source ;
6980 this .store = store ;
7081 this .currentJob = currentJob ;
7182 this .jobManager = jobManager ;
7283 this .jobUpdateTimeoutSeconds = jobUpdateTimeoutSeconds ;
7384 this .runnerName = jobManager .getRunnerType ().toString ();
85+ this .targetStatus = targetStatus ;
7486 }
7587
7688 @ Override
7789 public Job call () {
7890 ExecutorService executorService = Executors .newSingleThreadExecutor ();
7991 Future <Job > submittedJob ;
8092
81- if (currentJob .isEmpty ()) {
93+ if (this . targetStatus . equals ( JobTargetStatus . RUNNING ) && currentJob .isEmpty ()) {
8294 submittedJob = executorService .submit (this ::createJob );
95+ } else if (this .targetStatus .equals (JobTargetStatus .RUNNING )
96+ && currentJob .isPresent ()
97+ && requiresUpdate (currentJob .get ())) {
98+ submittedJob = executorService .submit (() -> updateJob (currentJob .get ()));
99+ } else if (this .targetStatus .equals (JobTargetStatus .ABORTED )
100+ && currentJob .isPresent ()
101+ && currentJob .get ().getStatus () == JobStatus .RUNNING ) {
102+ submittedJob = executorService .submit (() -> stopJob (currentJob .get ()));
103+ } else if (this .targetStatus .equals (JobTargetStatus .ABORTED ) && currentJob .isEmpty ()) {
104+ throw new IllegalArgumentException ("Cannot abort an nonexistent ingestion job." );
83105 } else {
84- Job job = currentJob .get ();
85-
86- if (requiresUpdate (job )) {
87- submittedJob = executorService .submit (() -> updateJob (job ));
88- } else {
89- return updateStatus (job );
90- }
106+ return this .updateStatus (currentJob .get ());
91107 }
92108
93109 try {
94110 return submittedJob .get (getJobUpdateTimeoutSeconds (), TimeUnit .SECONDS );
95111 } catch (InterruptedException | ExecutionException | TimeoutException e ) {
96- log .warn ("Unable to start job for source {} and sink {}: {}" , source , store , e .getMessage ());
112+ log .warn ("Unable to start job for source {} and sink {}:" , source , store );
113+ e .printStackTrace ();
97114 return null ;
98115 } finally {
99116 executorService .shutdownNow ();
@@ -111,24 +128,38 @@ boolean requiresUpdate(Job job) {
111128 }
112129
113130 private Job createJob () {
114- String jobId = createJobId (source . getId () , store .getName ());
131+ String jobId = createJobId (source , store .getName ());
115132 return startJob (jobId );
116133 }
117134
118135 /** Start or update the job to ingest data to the sink. */
119136 private Job startJob (String jobId ) {
120- Job job = new Job ();
121- job .setId (jobId );
122- job .setRunner (jobManager .getRunnerType ());
123- job .setSource (source );
124- job .setStore (store );
125- job .setStatus (JobStatus .PENDING );
137+ Job job =
138+ Job .builder ()
139+ .setId (jobId )
140+ .setRunner (jobManager .getRunnerType ())
141+ .setSource (source )
142+ .setStore (store )
143+ .setStatus (JobStatus .PENDING )
144+ .setFeatureSetJobStatuses (new HashSet <>())
145+ .build ();
126146
127147 updateFeatureSets (job );
128148
129149 try {
130150 logAudit (Action .SUBMIT , job , "Building graph and submitting to %s" , runnerName );
131151
152+ System .out .println (
153+ job .equals (
154+ Job .builder ()
155+ .setId ("job" )
156+ .setExtId ("" )
157+ .setRunner (Runner .DATAFLOW )
158+ .setSource (source )
159+ .setStore (store )
160+ .setStatus (JobStatus .PENDING )
161+ .build ()));
162+
132163 job = jobManager .startJob (job );
133164 var extId = job .getExtId ();
134165 if (extId .isEmpty ()) {
@@ -182,6 +213,12 @@ private Job updateJob(Job job) {
182213 return jobManager .updateJob (job );
183214 }
184215
216+ /** Stop the given job */
217+ private Job stopJob (Job job ) {
218+ logAudit (Action .ABORT , job , "Aborting job %s for runner %s" , job .getId (), runnerName );
219+ return jobManager .abortJob (job );
220+ }
221+
185222 private Job updateStatus (Job job ) {
186223 JobStatus currentStatus = job .getStatus ();
187224 JobStatus newStatus = jobManager .getJobStatus (job );
@@ -195,14 +232,13 @@ private Job updateStatus(Job job) {
195232 return job ;
196233 }
197234
198- String createJobId (String sourceId , String storeName ) {
235+ String createJobId (Source source , String storeName ) {
199236 String dateSuffix = String .valueOf (Instant .now ().toEpochMilli ());
200- String [] sourceParts = sourceId .split ("/" , 2 );
201- String sourceType = sourceParts [0 ].toLowerCase ();
202- String sourceHash =
203- Hashing .murmur3_128 ().hashUnencodedChars (sourceParts [1 ]).toString ().substring (0 , 10 );
204- String jobId = String .format ("%s-%s-to-%s-%s" , sourceType , sourceHash , storeName , dateSuffix );
205- return jobId .replaceAll ("_" , "-" );
237+ String jobId =
238+ String .format (
239+ "%s-%d-to-%s-%s" ,
240+ source .getTypeString (), Objects .hashCode (source .getConfig ()), storeName , dateSuffix );
241+ return jobId .replaceAll ("_store" , "-" );
206242 }
207243
208244 private void logAudit (Action action , Job job , String detail , Object ... args ) {
0 commit comments