Skip to content

Commit 3fd6c7a

Browse files
authored
Clean up Feast configuration (#611)
* Add validation to Core configuration and fix version loading Refactor, document, and validate Feast Core Properties Refactor FeastProperties to support nested store configuration Localize all store configuration in Serving in Spring configuration Various configuration updates * Allow Feast Serving to use types properties instead of maps * Reuse Feast Core Store model in serving * Remove redundant config classes for Redis * Update Serving Beans and Config classes to use ne1w configuration getters * Remove hot-loading from store configuration. This reduces a bit of flexibility, but simplifies the code and configuration * Set default build version in Feast Core "version" field in Feast Properties * Ensure FeatureSink creation is consistent for both Redis and BigQuery * Move BigQueryHistoricalRetriever configuration into Retriever from ServingServiceConfig * Allow a list of stores to be configured for forward compability * Remove Lombok from Serving configuration * Update Store configuration loading in serving to use a store model * Update RedisBackedJobService to instantiate its own Redis Client * Update comments in FeastProperties * Fix broken default application.yml and add comments in Serving * Refactored and cleaned up Feast Core configuration for job runners. * Remove commented out DataflowRunnerConfig setters * Clean up getJobManager and simplify field mapping in DataflowRunnerConfig * Add static factory methods to retrievers * Remove runner specific comment typo * Add oneOfStrings validator annotation for configuration validation * Fix broken Dataflow unit test that depends on GOOGLE_APPLICATION_CREDENTIALS
1 parent e461cde commit 3fd6c7a

File tree

49 files changed

+1432
-898
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

49 files changed

+1432
-898
lines changed

core/pom.xml

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,14 @@
3838
<configuration>
3939
<skip>false</skip>
4040
</configuration>
41+
<executions>
42+
<execution>
43+
<id>build-info</id>
44+
<goals>
45+
<goal>build-info</goal>
46+
</goals>
47+
</execution>
48+
</executions>
4149
</plugin>
4250
</plugins>
4351
</build>
@@ -207,5 +215,21 @@
207215
<artifactId>jaxb-api</artifactId>
208216
</dependency>
209217

218+
<dependency>
219+
<groupId>javax.validation</groupId>
220+
<artifactId>validation-api</artifactId>
221+
<version>2.0.0.Final</version>
222+
</dependency>
223+
<dependency>
224+
<groupId>org.hibernate.validator</groupId>
225+
<artifactId>hibernate-validator</artifactId>
226+
<version>6.1.2.Final</version>
227+
</dependency>
228+
<dependency>
229+
<groupId>org.hibernate.validator</groupId>
230+
<artifactId>hibernate-validator-annotation-processor</artifactId>
231+
<version>6.1.2.Final</version>
232+
</dependency>
233+
210234
</dependencies>
211235
</project>

core/src/main/java/feast/core/config/FeastProperties.java

Lines changed: 173 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -16,53 +16,211 @@
1616
*/
1717
package feast.core.config;
1818

19-
import java.util.Map;
19+
import feast.core.config.FeastProperties.StreamProperties.FeatureStreamOptions;
20+
import feast.core.validators.OneOfStrings;
21+
import java.util.*;
22+
import javax.annotation.PostConstruct;
23+
import javax.validation.*;
24+
import javax.validation.constraints.NotBlank;
25+
import javax.validation.constraints.NotNull;
26+
import javax.validation.constraints.Positive;
2027
import lombok.Getter;
2128
import lombok.Setter;
29+
import org.hibernate.validator.constraints.URL;
30+
import org.springframework.beans.factory.annotation.Autowired;
2231
import org.springframework.boot.context.properties.ConfigurationProperties;
32+
import org.springframework.boot.info.BuildProperties;
2333

2434
@Getter
2535
@Setter
2636
@ConfigurationProperties(prefix = "feast", ignoreInvalidFields = true)
2737
public class FeastProperties {
2838

29-
private String version;
30-
private JobProperties jobs;
39+
/**
40+
* Instantiates a new Feast properties.
41+
*
42+
* @param buildProperties Feast build properties
43+
*/
44+
@Autowired
45+
public FeastProperties(BuildProperties buildProperties) {
46+
setVersion(buildProperties.getVersion());
47+
}
48+
49+
/** Instantiates a new Feast properties. */
50+
public FeastProperties() {}
51+
52+
/* Feast Core Build Version */
53+
@NotBlank private String version = "unknown";
54+
55+
/* Population job properties */
56+
@NotNull private JobProperties jobs;
57+
58+
@NotNull
59+
/* Feast Kafka stream properties */
3160
private StreamProperties stream;
3261

62+
/** Feast job properties. These properties are used for ingestion jobs. */
3363
@Getter
3464
@Setter
3565
public static class JobProperties {
3666

37-
private String runner;
38-
private Map<String, String> options;
67+
@NotBlank
68+
/* The active Apache Beam runner name. This name references one instance of the Runner class */
69+
private String activeRunner;
70+
71+
/** List of configured job runners. */
72+
private List<Runner> runners = new ArrayList<>();
73+
74+
/**
75+
* Gets a {@link Runner} instance of the active runner
76+
*
77+
* @return the active runner
78+
*/
79+
public Runner getActiveRunner() {
80+
for (Runner runner : getRunners()) {
81+
if (activeRunner.equals(runner.getName())) {
82+
return runner;
83+
}
84+
}
85+
throw new RuntimeException(
86+
String.format(
87+
"Active runner is misconfigured. Could not find runner: %s.", activeRunner));
88+
}
89+
90+
/** Job Runner class. */
91+
@Getter
92+
@Setter
93+
public static class Runner {
94+
/** Job runner name. This must be unique. */
95+
String name;
96+
97+
/** Job runner type DirectRunner, DataflowRunner currently supported */
98+
String type;
99+
100+
/**
101+
* Job runner configuration options. See the following for options
102+
* https://api.docs.feast.dev/grpc/feast.core.pb.html#Runner
103+
*/
104+
Map<String, String> options = new HashMap<>();
105+
106+
/**
107+
* Gets the job runner type as an enum.
108+
*
109+
* @return Returns the job runner type as {@link feast.core.job.Runner}
110+
*/
111+
public feast.core.job.Runner getType() {
112+
return feast.core.job.Runner.fromString(type);
113+
}
114+
}
115+
116+
@NotNull
117+
/* Population job metric properties */
39118
private MetricsProperties metrics;
40-
private JobUpdatesProperties updates;
41-
}
42119

43-
@Getter
44-
@Setter
45-
public static class JobUpdatesProperties {
120+
/* Timeout in seconds for each attempt to update or submit a new job to the runner */
121+
@Positive private long jobUpdateTimeoutSeconds;
46122

47-
private long timeoutSeconds;
48-
private long pollingIntervalMillis;
123+
/* Job update polling interval in millisecond. How frequently Feast will update running jobs. */
124+
@Positive private long pollingIntervalMilliseconds;
49125
}
50126

127+
/** Properties used to configure Feast's managed Kafka feature stream. */
51128
@Getter
52129
@Setter
53130
public static class StreamProperties {
54131

132+
/* Feature stream type. Only "kafka" is supported. */
133+
@OneOfStrings({"kafka"})
134+
@NotBlank
55135
private String type;
56-
private Map<String, String> options;
136+
137+
/* Feature stream options */
138+
@NotNull private FeatureStreamOptions options;
139+
140+
/** Feature stream options */
141+
@Getter
142+
@Setter
143+
public static class FeatureStreamOptions {
144+
145+
/* Kafka topic to use for feature sets without source topics. */
146+
@NotBlank private String topic = "feast-features";
147+
148+
/**
149+
* Comma separated list of Kafka bootstrap servers. Used for feature sets without a defined
150+
* source.
151+
*/
152+
@NotBlank private String bootstrapServers = "localhost:9092";
153+
154+
/* Defines the number of copies of managed feature stream Kafka. */
155+
@Positive private short replicationFactor = 1;
156+
157+
/* Number of Kafka partitions to to use for managed feature stream. */
158+
@Positive private int partitions = 1;
159+
}
57160
}
58161

162+
/** Feast population job metrics */
59163
@Getter
60164
@Setter
61165
public static class MetricsProperties {
62166

167+
/* Population job metrics enabled */
63168
private boolean enabled;
169+
170+
/* Metric type. Possible options: statsd */
171+
@OneOfStrings({"statsd"})
172+
@NotBlank
64173
private String type;
65-
private String host;
66-
private int port;
174+
175+
/* Host of metric sink */
176+
@URL private String host;
177+
178+
/* Port of metric sink */
179+
@Positive private int port;
180+
}
181+
182+
/**
183+
* Validates all FeastProperties. This method runs after properties have been initialized and
184+
* individually and conditionally validates each class.
185+
*/
186+
@PostConstruct
187+
public void validate() {
188+
ValidatorFactory factory = Validation.buildDefaultValidatorFactory();
189+
Validator validator = factory.getValidator();
190+
191+
// Validate root fields in FeastProperties
192+
Set<ConstraintViolation<FeastProperties>> violations = validator.validate(this);
193+
if (!violations.isEmpty()) {
194+
throw new ConstraintViolationException(violations);
195+
}
196+
197+
// Validate Stream properties
198+
Set<ConstraintViolation<StreamProperties>> streamPropertyViolations =
199+
validator.validate(getStream());
200+
if (!streamPropertyViolations.isEmpty()) {
201+
throw new ConstraintViolationException(streamPropertyViolations);
202+
}
203+
204+
// Validate Stream Options
205+
Set<ConstraintViolation<FeatureStreamOptions>> featureStreamOptionsViolations =
206+
validator.validate(getStream().getOptions());
207+
if (!featureStreamOptionsViolations.isEmpty()) {
208+
throw new ConstraintViolationException(featureStreamOptionsViolations);
209+
}
210+
211+
// Validate JobProperties
212+
Set<ConstraintViolation<JobProperties>> jobPropertiesViolations = validator.validate(getJobs());
213+
if (!jobPropertiesViolations.isEmpty()) {
214+
throw new ConstraintViolationException(jobPropertiesViolations);
215+
}
216+
217+
// Validate MetricsProperties
218+
if (getJobs().getMetrics().isEnabled()) {
219+
Set<ConstraintViolation<MetricsProperties>> jobMetricViolations =
220+
validator.validate(getJobs().getMetrics());
221+
if (!jobMetricViolations.isEmpty()) {
222+
throw new ConstraintViolationException(jobMetricViolations);
223+
}
224+
}
67225
}
68226
}

core/src/main/java/feast/core/config/FeatureStreamConfig.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,8 @@ public Source getDefaultSource(FeastProperties feastProperties) {
4848
SourceType featureStreamType = SourceType.valueOf(streamProperties.getType().toUpperCase());
4949
switch (featureStreamType) {
5050
case KAFKA:
51-
String bootstrapServers = streamProperties.getOptions().get("bootstrapServers");
52-
String topicName = streamProperties.getOptions().get("topic");
51+
String bootstrapServers = streamProperties.getOptions().getBootstrapServers();
52+
String topicName = streamProperties.getOptions().getTopic();
5353
Map<String, Object> map = new HashMap<>();
5454
map.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
5555
map.put(
@@ -59,9 +59,8 @@ public Source getDefaultSource(FeastProperties feastProperties) {
5959
NewTopic newTopic =
6060
new NewTopic(
6161
topicName,
62-
Integer.valueOf(streamProperties.getOptions().getOrDefault("numPartitions", "1")),
63-
Short.valueOf(
64-
streamProperties.getOptions().getOrDefault("replicationFactor", "1")));
62+
streamProperties.getOptions().getPartitions(),
63+
streamProperties.getOptions().getReplicationFactor());
6564
CreateTopicsResult createTopicsResult =
6665
client.createTopics(Collections.singleton(newTopic));
6766
try {

core/src/main/java/feast/core/config/JobConfig.java

Lines changed: 10 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -16,22 +16,11 @@
1616
*/
1717
package feast.core.config;
1818

19-
import com.google.api.client.googleapis.auth.oauth2.GoogleCredential;
20-
import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport;
21-
import com.google.api.client.json.jackson2.JacksonFactory;
22-
import com.google.api.services.dataflow.Dataflow;
23-
import com.google.api.services.dataflow.DataflowScopes;
24-
import com.google.common.base.Strings;
2519
import feast.core.config.FeastProperties.JobProperties;
26-
import feast.core.config.FeastProperties.JobUpdatesProperties;
2720
import feast.core.job.JobManager;
28-
import feast.core.job.Runner;
2921
import feast.core.job.dataflow.DataflowJobManager;
3022
import feast.core.job.direct.DirectJobRegistry;
3123
import feast.core.job.direct.DirectRunnerJobManager;
32-
import java.io.IOException;
33-
import java.security.GeneralSecurityException;
34-
import java.util.HashMap;
3524
import java.util.Map;
3625
import lombok.extern.slf4j.Slf4j;
3726
import org.springframework.beans.factory.annotation.Autowired;
@@ -44,65 +33,26 @@
4433
public class JobConfig {
4534

4635
/**
47-
* Get a JobManager according to the runner type and dataflow configuration.
36+
* Get a JobManager according to the runner type and Dataflow configuration.
4837
*
4938
* @param feastProperties feast config properties
5039
*/
5140
@Bean
5241
@Autowired
53-
public JobManager getJobManager(
54-
FeastProperties feastProperties, DirectJobRegistry directJobRegistry) {
42+
public JobManager getJobManager(FeastProperties feastProperties) {
5543

5644
JobProperties jobProperties = feastProperties.getJobs();
57-
Runner runner = Runner.fromString(jobProperties.getRunner());
58-
if (jobProperties.getOptions() == null) {
59-
jobProperties.setOptions(new HashMap<>());
60-
}
61-
Map<String, String> jobOptions = jobProperties.getOptions();
62-
switch (runner) {
63-
case DATAFLOW:
64-
if (Strings.isNullOrEmpty(jobOptions.getOrDefault("region", null))
65-
|| Strings.isNullOrEmpty(jobOptions.getOrDefault("project", null))) {
66-
log.error("Project and location of the Dataflow runner is not configured");
67-
throw new IllegalStateException(
68-
"Project and location of Dataflow runner must be specified for jobs to be run on Dataflow runner.");
69-
}
70-
try {
71-
GoogleCredential credential =
72-
GoogleCredential.getApplicationDefault().createScoped(DataflowScopes.all());
73-
Dataflow dataflow =
74-
new Dataflow(
75-
GoogleNetHttpTransport.newTrustedTransport(),
76-
JacksonFactory.getDefaultInstance(),
77-
credential);
45+
FeastProperties.JobProperties.Runner runner = jobProperties.getActiveRunner();
46+
Map<String, String> runnerConfigOptions = runner.getOptions();
47+
FeastProperties.MetricsProperties metrics = jobProperties.getMetrics();
7848

79-
return new DataflowJobManager(
80-
dataflow, jobProperties.getOptions(), jobProperties.getMetrics());
81-
} catch (IOException e) {
82-
throw new IllegalStateException(
83-
"Unable to find credential required for Dataflow monitoring API", e);
84-
} catch (GeneralSecurityException e) {
85-
throw new IllegalStateException("Security exception while connecting to Dataflow API", e);
86-
} catch (Exception e) {
87-
throw new IllegalStateException("Unable to initialize DataflowJobManager", e);
88-
}
49+
switch (runner.getType()) {
50+
case DATAFLOW:
51+
return new DataflowJobManager(runnerConfigOptions, metrics);
8952
case DIRECT:
90-
return new DirectRunnerJobManager(
91-
jobProperties.getOptions(), directJobRegistry, jobProperties.getMetrics());
53+
return new DirectRunnerJobManager(runnerConfigOptions, new DirectJobRegistry(), metrics);
9254
default:
93-
throw new IllegalArgumentException("Unsupported runner: " + jobProperties.getRunner());
55+
throw new IllegalArgumentException("Unsupported runner: " + runner);
9456
}
9557
}
96-
97-
/** Get a direct job registry */
98-
@Bean
99-
public DirectJobRegistry directJobRegistry() {
100-
return new DirectJobRegistry();
101-
}
102-
103-
/** Extracts job update options from feast core options. */
104-
@Bean
105-
public JobUpdatesProperties jobUpdatesProperties(FeastProperties feastProperties) {
106-
return feastProperties.getJobs().getUpdates();
107-
}
10858
}

0 commit comments

Comments
 (0)