Skip to content

Commit 51f4fb6

Browse files
author
Oleksii Moskalenko
authored
Use JobManager's backend as persistent storage and source of truth (#903)
* in memory job repository fixes after rebase fix python tests * handle empty response from dataflow * test job with no labels * some comments * some apidocs * fix comment * fix tests * refactor merging jobSelector & labels * default project name * default project name
1 parent 9fc44bf commit 51f4fb6

45 files changed

Lines changed: 1401 additions & 1949 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

common/pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
<dependency>
5959
<groupId>org.projectlombok</groupId>
6060
<artifactId>lombok</artifactId>
61+
<version>${lombok.version}</version>
6162
</dependency>
6263
<dependency>
6364
<groupId>javax.validation</groupId>

common/src/main/java/feast/common/models/FeatureSetReference.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
@Data
2727
@AllArgsConstructor
2828
public class FeatureSetReference implements Serializable {
29+
public static String PROJECT_DEFAULT_NAME = "default";
30+
2931
/* Name of project to which this featureSet is assigned */
3032
private String projectName;
3133
/* Name of FeatureSet */
@@ -38,9 +40,14 @@ public class FeatureSetReference implements Serializable {
3840
public FeatureSetReference() {}
3941

4042
public static FeatureSetReference of(String projectName, String featureSetName, Integer version) {
43+
projectName = projectName.isEmpty() ? PROJECT_DEFAULT_NAME : projectName;
4144
return new FeatureSetReference(projectName, featureSetName, version);
4245
}
4346

47+
public static FeatureSetReference of(String projectName, String featureSetName) {
48+
return FeatureSetReference.of(projectName, featureSetName, -1);
49+
}
50+
4451
public String getReference() {
4552
return String.format("%s/%s", getProjectName(), getFeatureSetName());
4653
}

core/pom.xml

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,7 @@
244244
<dependency>
245245
<groupId>org.projectlombok</groupId>
246246
<artifactId>lombok</artifactId>
247+
<version>${lombok.version}</version>
247248
</dependency>
248249

249250
<dependency>
@@ -381,5 +382,17 @@
381382
<version>3.0.0</version>
382383
<scope>test</scope>
383384
</dependency>
385+
386+
<dependency>
387+
<groupId>com.google.auto.value</groupId>
388+
<artifactId>auto-value-annotations</artifactId>
389+
<version>1.6.6</version>
390+
</dependency>
391+
<dependency>
392+
<groupId>com.google.auto.value</groupId>
393+
<artifactId>auto-value</artifactId>
394+
<version>1.6.6</version>
395+
<scope>provided</scope>
396+
</dependency>
384397
</dependencies>
385398
</project>

core/src/main/java/feast/core/config/FeastProperties.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,8 +103,18 @@ public static class JobProperties {
103103
/* The active Apache Beam runner name. This name references one instance of the Runner class */
104104
private String activeRunner;
105105

106-
/* If true only one IngestionJob would be created per source with all subscribed stores in it */
107-
private Boolean consolidateJobsPerSource = false;
106+
/* Job Coordinator related properties */
107+
private CoordinatorProperties coordinator;
108+
109+
@Getter
110+
@Setter
111+
public static class CoordinatorProperties {
112+
/* If true only one IngestionJob would be created per source with all subscribed stores in it */
113+
private Boolean consolidateJobsPerSource = false;
114+
115+
/* Labels to identify jobs managed by this job coordinator */
116+
private Map<String, String> jobSelector = new HashMap<>();
117+
}
108118

109119
/** List of configured job runners. */
110120
private List<Runner> runners = new ArrayList<>();

core/src/main/java/feast/core/config/JobConfig.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,11 @@
2020
import com.google.protobuf.InvalidProtocolBufferException;
2121
import com.google.protobuf.util.JsonFormat;
2222
import feast.core.config.FeastProperties.JobProperties;
23-
import feast.core.dao.JobRepository;
2423
import feast.core.job.ConsolidatedJobStrategy;
2524
import feast.core.job.JobGroupingStrategy;
2625
import feast.core.job.JobManager;
2726
import feast.core.job.JobPerStoreStrategy;
27+
import feast.core.job.JobRepository;
2828
import feast.core.job.dataflow.DataflowJobManager;
2929
import feast.core.job.direct.DirectJobRegistry;
3030
import feast.core.job.direct.DirectRunnerJobManager;
@@ -80,7 +80,8 @@ public IngestionJobProto.SpecsStreamingUpdateConfig createSpecsStreamingUpdateCo
8080
@Bean
8181
public JobGroupingStrategy getJobGroupingStrategy(
8282
FeastProperties feastProperties, JobRepository jobRepository) {
83-
Boolean shouldConsolidateJobs = feastProperties.getJobs().getConsolidateJobsPerSource();
83+
Boolean shouldConsolidateJobs =
84+
feastProperties.getJobs().getCoordinator().getConsolidateJobsPerSource();
8485
if (shouldConsolidateJobs) {
8586
return new ConsolidatedJobStrategy(jobRepository);
8687
} else {
@@ -103,17 +104,20 @@ public JobManager getJobManager(
103104
JobProperties jobProperties = feastProperties.getJobs();
104105
FeastProperties.JobProperties.Runner runner = jobProperties.getActiveRunner();
105106
Map<String, Object> runnerConfigOptions = runner.getOptions();
106-
String configJson = gson.toJson(runnerConfigOptions);
107107

108108
FeastProperties.MetricsProperties metrics = jobProperties.getMetrics();
109+
String configJson = gson.toJson(runnerConfigOptions);
109110

110111
switch (runner.getType()) {
111112
case DATAFLOW:
112113
DataflowRunnerConfigOptions.Builder dataflowRunnerConfigOptions =
113114
DataflowRunnerConfigOptions.newBuilder();
114115
JsonFormat.parser().merge(configJson, dataflowRunnerConfigOptions);
115-
return new DataflowJobManager(
116-
dataflowRunnerConfigOptions.build(), metrics, specsStreamingUpdateConfig);
116+
return DataflowJobManager.of(
117+
dataflowRunnerConfigOptions.build(),
118+
metrics,
119+
specsStreamingUpdateConfig,
120+
jobProperties.getCoordinator().getJobSelector());
117121
case DIRECT:
118122
DirectRunnerConfigOptions.Builder directRunnerConfigOptions =
119123
DirectRunnerConfigOptions.newBuilder();

core/src/main/java/feast/core/dao/FeatureSetJobStatusRepository.java

Lines changed: 0 additions & 25 deletions
This file was deleted.

core/src/main/java/feast/core/job/ConsolidatedJobStrategy.java

Lines changed: 26 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,11 @@
1616
*/
1717
package feast.core.job;
1818

19-
import feast.core.dao.JobRepository;
2019
import feast.core.model.Job;
2120
import feast.core.model.JobStatus;
22-
import feast.core.model.Source;
23-
import feast.core.model.Store;
21+
import feast.proto.core.SourceProto;
22+
import feast.proto.core.StoreProto;
2423
import java.time.Instant;
25-
import java.util.HashSet;
2624
import java.util.Map;
2725
import java.util.Objects;
2826
import java.util.Set;
@@ -45,35 +43,43 @@ public ConsolidatedJobStrategy(JobRepository jobRepository) {
4543
}
4644

4745
@Override
48-
public Job getOrCreateJob(Source source, Set<Store> stores) {
46+
public Job getOrCreateJob(SourceProto.Source source, Set<StoreProto.Store> stores) {
4947
return jobRepository
50-
.findFirstBySourceTypeAndSourceConfigAndStoreNameAndStatusNotInOrderByLastUpdatedDesc(
51-
source.getType(), source.getConfig(), null, JobStatus.getTerminalStates())
48+
.findFirstBySourceAndStoreNameAndStatusNotInOrderByLastUpdatedDesc(
49+
source, null, JobStatus.getTerminalStates())
5250
.orElseGet(
53-
() -> {
54-
Job job =
55-
Job.builder().setSource(source).setFeatureSetJobStatuses(new HashSet<>()).build();
56-
job.setStores(stores);
57-
return job;
58-
});
51+
() ->
52+
Job.builder()
53+
.setId(createJobId(source))
54+
.setSource(source)
55+
.setStores(
56+
stores.stream()
57+
.collect(Collectors.toMap(StoreProto.Store::getName, s -> s)))
58+
.build());
5959
}
6060

61-
@Override
62-
public String createJobId(Job job) {
61+
private String createJobId(SourceProto.Source source) {
6362
String dateSuffix = String.valueOf(Instant.now().toEpochMilli());
6463
String jobId =
6564
String.format(
6665
"%s-%d-%s",
67-
job.getSource().getTypeString(),
68-
Objects.hashCode(job.getSource().getConfig()),
66+
source.getType().getValueDescriptor().getName(),
67+
Objects.hash(
68+
source.getKafkaSourceConfig().getBootstrapServers(),
69+
source.getKafkaSourceConfig().getTopic()),
6970
dateSuffix);
7071
return jobId.replaceAll("_store", "-").toLowerCase();
7172
}
7273

7374
@Override
74-
public Iterable<Pair<Source, Set<Store>>> collectSingleJobInput(
75-
Stream<Pair<Source, Store>> stream) {
76-
Map<Source, Set<Store>> map =
75+
public String createJobId(Job job) {
76+
return createJobId(job.getSource());
77+
}
78+
79+
@Override
80+
public Iterable<Pair<SourceProto.Source, Set<StoreProto.Store>>> collectSingleJobInput(
81+
Stream<Pair<SourceProto.Source, StoreProto.Store>> stream) {
82+
Map<SourceProto.Source, Set<StoreProto.Store>> map =
7783
stream.collect(
7884
Collectors.groupingBy(
7985
Pair::getLeft, Collectors.mapping(Pair::getRight, Collectors.toSet())));
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
* Copyright 2018-2020 The Feast Authors
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* https://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package feast.core.job;
18+
19+
import com.google.common.collect.Lists;
20+
import feast.common.models.FeatureSetReference;
21+
import feast.core.model.Job;
22+
import feast.core.model.JobStatus;
23+
import feast.proto.core.SourceProto;
24+
import java.util.*;
25+
import java.util.function.Predicate;
26+
import java.util.stream.Collectors;
27+
import org.springframework.beans.factory.annotation.Autowired;
28+
import org.springframework.stereotype.Component;
29+
30+
/**
31+
* Keeps state of all jobs managed by current application in memory. On start loads persistent state
32+
* through JobManager from JobManager backend.
33+
*
34+
* <p>E.g., with Dataflow runner all jobs with their state stored on Google's side and accessible
35+
* via API. So we don't need to persist this state ourselves. Instead we just fetch it once and then
36+
* we apply same changes to dataflow (via JobManager) and to InMemoryRepository to keep them in
37+
* sync.
38+
*
39+
* <p>Provides flexible access to objects via JPA-like filtering API.
40+
*/
41+
@Component
42+
public class InMemoryJobRepository implements JobRepository {
43+
private final JobManager jobManager;
44+
45+
/** Internal storage for all jobs mapped by their Id */
46+
private Map<String, Job> storage;
47+
48+
@Autowired
49+
public InMemoryJobRepository(JobManager jobManager) {
50+
this.jobManager = jobManager;
51+
this.storage = new HashMap<>();
52+
53+
this.storage =
54+
this.jobManager.listRunningJobs().stream().collect(Collectors.toMap(Job::getId, j -> j));
55+
}
56+
57+
/**
58+
* Returns single job that has given source, store with given name ans its status is not in given
59+
* statuses. We expect this parameters to specify only one RUNNING job (most of the time). But in
60+
* case there're many - we return latest updated one.
61+
*
62+
* @return job that matches given parameters if it's present
63+
*/
64+
@Override
65+
public Optional<Job> findFirstBySourceAndStoreNameAndStatusNotInOrderByLastUpdatedDesc(
66+
SourceProto.Source source, String storeName, Collection<JobStatus> statuses) {
67+
return this.storage.values().stream()
68+
.filter(
69+
j ->
70+
j.getSource().equals(source)
71+
&& (storeName == null || j.getStores().containsKey(storeName))
72+
&& (!statuses.contains(j.getStatus())))
73+
.max(Comparator.comparing(Job::getLastUpdated));
74+
}
75+
76+
private List<Job> findWithFilter(Predicate<Job> p) {
77+
return this.storage.values().stream().filter(p).collect(Collectors.toList());
78+
}
79+
80+
/** Find Jobs that have given status */
81+
@Override
82+
public List<Job> findByStatus(JobStatus status) {
83+
return this.findWithFilter(j -> j.getStatus().equals(status));
84+
}
85+
86+
/**
87+
* Find Jobs that have given FeatureSet (specified by {@link FeatureSetReference} allocated to it.
88+
*/
89+
@Override
90+
public List<Job> findByFeatureSetReference(FeatureSetReference reference) {
91+
return this.findWithFilter(j -> j.getFeatureSetDeliveryStatuses().containsKey(reference));
92+
}
93+
94+
/** Find Jobs that have one of the stores with given name */
95+
@Override
96+
public List<Job> findByJobStoreName(String storeName) {
97+
return this.findWithFilter(j -> j.getStores().containsKey(storeName));
98+
}
99+
100+
/** Find by Job's Id */
101+
@Override
102+
public Optional<Job> findById(String jobId) {
103+
return Optional.ofNullable(this.storage.get(jobId));
104+
}
105+
106+
@Override
107+
public List<Job> findAll() {
108+
return Lists.newArrayList(this.storage.values());
109+
}
110+
111+
@Override
112+
public void add(Job job) {
113+
job.preSave();
114+
115+
this.storage.put(job.getId(), job);
116+
}
117+
118+
@Override
119+
public void deleteAll() {
120+
this.storage.clear();
121+
}
122+
}

core/src/main/java/feast/core/job/JobGroupingStrategy.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717
package feast.core.job;
1818

1919
import feast.core.model.Job;
20-
import feast.core.model.Source;
21-
import feast.core.model.Store;
20+
import feast.proto.core.SourceProto;
21+
import feast.proto.core.StoreProto;
2222
import java.util.Set;
2323
import java.util.stream.Stream;
2424
import org.apache.commons.lang3.tuple.Pair;
@@ -29,10 +29,10 @@
2929
*/
3030
public interface JobGroupingStrategy {
3131
/** Get the non terminated ingestion job ingesting for given source and stores. */
32-
public Job getOrCreateJob(Source source, Set<Store> stores);
32+
Job getOrCreateJob(SourceProto.Source source, Set<StoreProto.Store> stores);
3333
/** Create unique JobId that would be used as key in communications with JobRunner */
34-
public String createJobId(Job job);
34+
String createJobId(Job job);
3535
/* Distribute given sources and stores across jobs. One yielded Pair - one created Job **/
36-
public Iterable<Pair<Source, Set<Store>>> collectSingleJobInput(
37-
Stream<Pair<Source, Store>> stream);
36+
Iterable<Pair<SourceProto.Source, Set<StoreProto.Store>>> collectSingleJobInput(
37+
Stream<Pair<SourceProto.Source, StoreProto.Store>> stream);
3838
}

core/src/main/java/feast/core/job/JobManager.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import feast.core.model.Job;
2020
import feast.core.model.JobStatus;
21+
import java.util.List;
2122

2223
public interface JobManager {
2324

@@ -70,4 +71,11 @@ public interface JobManager {
7071
* @return job status.
7172
*/
7273
JobStatus getJobStatus(Job job);
74+
75+
/**
76+
* List of RUNNING jobs
77+
*
78+
* @return list of jobs
79+
*/
80+
List<Job> listRunningJobs();
7381
}

0 commit comments

Comments
 (0)