Skip to content

Commit f425d7d

Browse files
woopmrzzy
andauthored
Refactor Source & Job data model and Stop Duplicate Ingestion Jobs (feast-dev#685)
* Rebase & squash on master * Fix java unit tests * Rebase on JobService PR fix * Update JobUpdateTask's createJobId() to generate id based on source config instead of id. * Fix wrong conflict resolve in FeatureStreamConfig Co-authored-by: Zhu Zhanyan <program.nom@gmail.com>
1 parent 49dca68 commit f425d7d

31 files changed

+1148
-671
lines changed

core/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@
175175
<artifactId>spring-kafka</artifactId>
176176
</dependency>
177177

178-
<!--compileOnly 'org.projectlombok:lombok:1.18.2'-->
178+
<!--compileOnly 'org.projectlombok:lombok:1.18.12'-->
179179
<dependency>
180180
<groupId>org.projectlombok</groupId>
181181
<artifactId>lombok</artifactId>

core/src/main/java/feast/core/config/FeatureStreamConfig.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import feast.core.util.KafkaSerialization;
2222
import feast.proto.core.FeatureSetProto;
2323
import feast.proto.core.IngestionJobProto;
24+
import feast.proto.core.SourceProto;
2425
import feast.proto.core.SourceProto.KafkaSourceConfig;
2526
import feast.proto.core.SourceProto.SourceType;
2627
import java.util.HashMap;
@@ -145,7 +146,12 @@ public Source getDefaultSource(FeastProperties feastProperties) {
145146
.setBootstrapServers(bootstrapServers)
146147
.setTopic(topicName)
147148
.build();
148-
return new Source(featureStreamType, sourceConfig, true);
149+
SourceProto.Source source =
150+
SourceProto.Source.newBuilder()
151+
.setType(featureStreamType)
152+
.setKafkaSourceConfig(sourceConfig)
153+
.build();
154+
return Source.fromProto(source, true);
149155
default:
150156
throw new RuntimeException("Unsupported source stream, only [KAFKA] is supported");
151157
}

core/src/main/java/feast/core/dao/JobRepository.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,28 @@
1818

1919
import feast.core.model.FeatureSetJobStatus;
2020
import feast.core.model.Job;
21+
import feast.core.model.JobStatus;
22+
import feast.proto.core.SourceProto;
23+
import java.util.Collection;
2124
import java.util.List;
25+
import java.util.Optional;
2226
import org.springframework.data.jpa.repository.JpaRepository;
2327
import org.springframework.stereotype.Repository;
2428

2529
/** JPA repository supplying Job objects keyed by ID. */
2630
@Repository
2731
public interface JobRepository extends JpaRepository<Job, String> {
28-
List<Job> findBySourceIdAndStoreNameOrderByLastUpdatedDesc(String sourceId, String storeName);
32+
Optional<Job>
33+
findFirstBySourceTypeAndSourceConfigAndStoreNameAndStatusNotInOrderByLastUpdatedDesc(
34+
SourceProto.SourceType sourceType,
35+
String sourceConfig,
36+
String storeName,
37+
Collection<JobStatus> statuses);
2938

30-
// find jobs by feast store name
31-
List<Job> findByStoreName(String storeName);
39+
List<Job> findByStatus(JobStatus status);
3240

33-
// find jobs by featureset
3441
List<Job> findByFeatureSetJobStatusesIn(List<FeatureSetJobStatus> featureSetsJobStatuses);
42+
43+
// find jobs by feast store name
44+
List<Job> findByStoreName(String storeName);
3545
}

core/src/main/java/feast/core/dao/SourceRepository.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,10 @@
1717
package feast.core.dao;
1818

1919
import feast.core.model.Source;
20+
import feast.proto.core.SourceProto.SourceType;
2021
import org.springframework.data.jpa.repository.JpaRepository;
2122

2223
/** JPA repository supplying Source objects keyed by id. */
23-
public interface SourceRepository extends JpaRepository<Source, String> {}
24+
public interface SourceRepository extends JpaRepository<Source, String> {
25+
Source findFirstByTypeAndConfigOrderByIdAsc(SourceType type, String config);
26+
}

core/src/main/java/feast/core/job/JobManager.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ public interface JobManager {
2929
Runner getRunnerType();
3030

3131
/**
32-
* Start an import job.
32+
* Start an import job. Start should change the status of the Job from PENDING to RUNNING.
3333
*
3434
* @param job job to start
3535
* @return Job
@@ -45,11 +45,13 @@ public interface JobManager {
4545
Job updateJob(Job job);
4646

4747
/**
48-
* Abort a job given runner-specific job ID.
48+
* Abort a job given runner-specific job ID. Abort should change the status of the Job from
49+
* RUNNING to ABORTING.
4950
*
50-
* @param extId runner specific job id.
51+
* @param job to abort.
52+
* @return The aborted Job
5153
*/
52-
void abortJob(String extId);
54+
Job abortJob(Job job);
5355

5456
/**
5557
* Restart an job. If job is an terminated state, will simply start the job. Might cause data to

core/src/main/java/feast/core/job/JobUpdateTask.java

Lines changed: 63 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,10 @@
2323
import feast.core.model.*;
2424
import feast.proto.core.FeatureSetProto;
2525
import java.time.Instant;
26+
import java.util.HashSet;
2627
import java.util.List;
2728
import java.util.Map;
29+
import java.util.Objects;
2830
import java.util.Optional;
2931
import java.util.concurrent.Callable;
3032
import java.util.concurrent.ExecutionException;
@@ -36,21 +38,30 @@
3638
import java.util.stream.Collectors;
3739
import lombok.Getter;
3840
import lombok.extern.slf4j.Slf4j;
39-
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.hash.Hashing;
4041

4142
/**
4243
* JobUpdateTask is a callable that starts or updates a job given a set of featureSetSpecs, as well
43-
* as their source and sink.
44+
* their source and sink to transition to targetStatus.
4445
*
4546
* <p>When complete, the JobUpdateTask returns the updated Job object to be pushed to the db.
4647
*/
4748
@Slf4j
4849
@Getter
4950
public class JobUpdateTask implements Callable<Job> {
5051

52+
/**
53+
* JobTargetStatus enum defines the possible target statuses that JobUpdateTask can transition a
54+
* Job to.
55+
*/
56+
public enum JobTargetStatus {
57+
RUNNING,
58+
ABORTED
59+
}
60+
5161
private final List<FeatureSet> featureSets;
5262
private final Source source;
5363
private final Store store;
64+
private final JobTargetStatus targetStatus;
5465
private final Optional<Job> currentJob;
5566
private final JobManager jobManager;
5667
private final long jobUpdateTimeoutSeconds;
@@ -62,38 +73,44 @@ public JobUpdateTask(
6273
Store store,
6374
Optional<Job> currentJob,
6475
JobManager jobManager,
65-
long jobUpdateTimeoutSeconds) {
66-
76+
long jobUpdateTimeoutSeconds,
77+
JobTargetStatus targetStatus) {
6778
this.featureSets = featureSets;
6879
this.source = source;
6980
this.store = store;
7081
this.currentJob = currentJob;
7182
this.jobManager = jobManager;
7283
this.jobUpdateTimeoutSeconds = jobUpdateTimeoutSeconds;
7384
this.runnerName = jobManager.getRunnerType().toString();
85+
this.targetStatus = targetStatus;
7486
}
7587

7688
@Override
7789
public Job call() {
7890
ExecutorService executorService = Executors.newSingleThreadExecutor();
7991
Future<Job> submittedJob;
8092

81-
if (currentJob.isEmpty()) {
93+
if (this.targetStatus.equals(JobTargetStatus.RUNNING) && currentJob.isEmpty()) {
8294
submittedJob = executorService.submit(this::createJob);
95+
} else if (this.targetStatus.equals(JobTargetStatus.RUNNING)
96+
&& currentJob.isPresent()
97+
&& requiresUpdate(currentJob.get())) {
98+
submittedJob = executorService.submit(() -> updateJob(currentJob.get()));
99+
} else if (this.targetStatus.equals(JobTargetStatus.ABORTED)
100+
&& currentJob.isPresent()
101+
&& currentJob.get().getStatus() == JobStatus.RUNNING) {
102+
submittedJob = executorService.submit(() -> stopJob(currentJob.get()));
103+
} else if (this.targetStatus.equals(JobTargetStatus.ABORTED) && currentJob.isEmpty()) {
104+
throw new IllegalArgumentException("Cannot abort an nonexistent ingestion job.");
83105
} else {
84-
Job job = currentJob.get();
85-
86-
if (requiresUpdate(job)) {
87-
submittedJob = executorService.submit(() -> updateJob(job));
88-
} else {
89-
return updateStatus(job);
90-
}
106+
return this.updateStatus(currentJob.get());
91107
}
92108

93109
try {
94110
return submittedJob.get(getJobUpdateTimeoutSeconds(), TimeUnit.SECONDS);
95111
} catch (InterruptedException | ExecutionException | TimeoutException e) {
96-
log.warn("Unable to start job for source {} and sink {}: {}", source, store, e.getMessage());
112+
log.warn("Unable to start job for source {} and sink {}:", source, store);
113+
e.printStackTrace();
97114
return null;
98115
} finally {
99116
executorService.shutdownNow();
@@ -111,24 +128,38 @@ boolean requiresUpdate(Job job) {
111128
}
112129

113130
private Job createJob() {
114-
String jobId = createJobId(source.getId(), store.getName());
131+
String jobId = createJobId(source, store.getName());
115132
return startJob(jobId);
116133
}
117134

118135
/** Start or update the job to ingest data to the sink. */
119136
private Job startJob(String jobId) {
120-
Job job = new Job();
121-
job.setId(jobId);
122-
job.setRunner(jobManager.getRunnerType());
123-
job.setSource(source);
124-
job.setStore(store);
125-
job.setStatus(JobStatus.PENDING);
137+
Job job =
138+
Job.builder()
139+
.setId(jobId)
140+
.setRunner(jobManager.getRunnerType())
141+
.setSource(source)
142+
.setStore(store)
143+
.setStatus(JobStatus.PENDING)
144+
.setFeatureSetJobStatuses(new HashSet<>())
145+
.build();
126146

127147
updateFeatureSets(job);
128148

129149
try {
130150
logAudit(Action.SUBMIT, job, "Building graph and submitting to %s", runnerName);
131151

152+
System.out.println(
153+
job.equals(
154+
Job.builder()
155+
.setId("job")
156+
.setExtId("")
157+
.setRunner(Runner.DATAFLOW)
158+
.setSource(source)
159+
.setStore(store)
160+
.setStatus(JobStatus.PENDING)
161+
.build()));
162+
132163
job = jobManager.startJob(job);
133164
var extId = job.getExtId();
134165
if (extId.isEmpty()) {
@@ -182,6 +213,12 @@ private Job updateJob(Job job) {
182213
return jobManager.updateJob(job);
183214
}
184215

216+
/** Stop the given job */
217+
private Job stopJob(Job job) {
218+
logAudit(Action.ABORT, job, "Aborting job %s for runner %s", job.getId(), runnerName);
219+
return jobManager.abortJob(job);
220+
}
221+
185222
private Job updateStatus(Job job) {
186223
JobStatus currentStatus = job.getStatus();
187224
JobStatus newStatus = jobManager.getJobStatus(job);
@@ -195,14 +232,13 @@ private Job updateStatus(Job job) {
195232
return job;
196233
}
197234

198-
String createJobId(String sourceId, String storeName) {
235+
String createJobId(Source source, String storeName) {
199236
String dateSuffix = String.valueOf(Instant.now().toEpochMilli());
200-
String[] sourceParts = sourceId.split("/", 2);
201-
String sourceType = sourceParts[0].toLowerCase();
202-
String sourceHash =
203-
Hashing.murmur3_128().hashUnencodedChars(sourceParts[1]).toString().substring(0, 10);
204-
String jobId = String.format("%s-%s-to-%s-%s", sourceType, sourceHash, storeName, dateSuffix);
205-
return jobId.replaceAll("_", "-");
237+
String jobId =
238+
String.format(
239+
"%s-%d-to-%s-%s",
240+
source.getTypeString(), Objects.hashCode(source.getConfig()), storeName, dateSuffix);
241+
return jobId.replaceAll("_store", "-");
206242
}
207243

208244
private void logAudit(Action action, Job job, String detail, Object... args) {

core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ public Job startJob(Job job) {
116116
submitDataflowJob(
117117
job.getId(), job.getSource().toProto(), job.getStore().toProto(), false);
118118
job.setExtId(extId);
119+
job.setStatus(JobStatus.RUNNING);
119120
return job;
120121

121122
} catch (InvalidProtocolBufferException e) {
@@ -136,25 +137,27 @@ public Job startJob(Job job) {
136137
*/
137138
@Override
138139
public Job updateJob(Job job) {
139-
abortJob(job.getExtId());
140+
abortJob(job);
140141
return job;
141142
}
142143

143144
/**
144145
* Abort an existing Dataflow job. Streaming Dataflow jobs are always drained, not cancelled.
145146
*
146-
* @param dataflowJobId Dataflow-specific job id (not the job name)
147+
* @param job to abort.
148+
* @return The aborted Job.
147149
*/
148150
@Override
149-
public void abortJob(String dataflowJobId) {
151+
public Job abortJob(Job job) {
152+
String dataflowJobId = job.getExtId();
150153
try {
151-
com.google.api.services.dataflow.model.Job job =
154+
com.google.api.services.dataflow.model.Job dataflowJob =
152155
dataflow.projects().locations().jobs().get(projectId, location, dataflowJobId).execute();
153156
com.google.api.services.dataflow.model.Job content =
154157
new com.google.api.services.dataflow.model.Job();
155-
if (job.getType().equals(DataflowJobType.JOB_TYPE_BATCH.toString())) {
158+
if (dataflowJob.getType().equals(DataflowJobType.JOB_TYPE_BATCH.toString())) {
156159
content.setRequestedState(DataflowJobState.JOB_STATE_CANCELLED.toString());
157-
} else if (job.getType().equals(DataflowJobType.JOB_TYPE_STREAMING.toString())) {
160+
} else if (dataflowJob.getType().equals(DataflowJobType.JOB_TYPE_STREAMING.toString())) {
158161
content.setRequestedState(DataflowJobState.JOB_STATE_DRAINING.toString());
159162
}
160163
dataflow
@@ -168,6 +171,9 @@ public void abortJob(String dataflowJobId) {
168171
throw new RuntimeException(
169172
Strings.lenientFormat("Unable to drain job with id: %s", dataflowJobId), e);
170173
}
174+
175+
job.setStatus(JobStatus.ABORTING);
176+
return job;
171177
}
172178

173179
/**

core/src/main/java/feast/core/job/direct/DirectRunnerJobManager.java

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -121,30 +121,32 @@ private ImportOptions getPipelineOptions(
121121
*/
122122
@Override
123123
public Job updateJob(Job job) {
124-
String jobId = job.getExtId();
125-
abortJob(jobId);
126124
try {
127-
return startJob(job);
125+
return startJob(abortJob(job));
128126
} catch (JobExecutionException e) {
129127
throw new JobExecutionException(String.format("Error running ingestion job: %s", e), e);
130128
}
131129
}
132130

133131
/**
134-
* Abort the direct runner job with the given id, then remove it from the direct jobs registry.
132+
* Abort the direct runner job,removing it from the direct jobs registry.
135133
*
136-
* @param extId runner specific job id.
134+
* @param job to abort.
135+
* @return The aborted Job
137136
*/
138137
@Override
139-
public void abortJob(String extId) {
140-
DirectJob job = jobs.get(extId);
138+
public Job abortJob(Job job) {
139+
DirectJob directJob = jobs.get(job.getExtId());
141140
try {
142-
job.abort();
141+
directJob.abort();
143142
} catch (IOException e) {
144143
throw new RuntimeException(
145-
Strings.lenientFormat("Unable to abort DirectRunner job %s", extId), e);
144+
Strings.lenientFormat("Unable to abort DirectRunner job %s", job.getExtId(), e));
146145
}
147-
jobs.remove(extId);
146+
jobs.remove(job.getExtId());
147+
148+
job.setStatus(JobStatus.ABORTING);
149+
return job;
148150
}
149151

150152
public PipelineResult runPipeline(ImportOptions pipelineOptions) throws IOException {

core/src/main/java/feast/core/model/FeatureSet.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
import lombok.Getter;
3131
import lombok.Setter;
3232
import org.apache.commons.lang3.builder.HashCodeBuilder;
33-
import org.springframework.data.util.Pair;
33+
import org.apache.commons.lang3.tuple.Pair;
3434
import org.tensorflow.metadata.v0.*;
3535

3636
@Getter

0 commit comments

Comments
 (0)