Skip to content

Commit dfc81b9

Browse files
author
Chen Zhiling
authored
Add support for feature set updates and remove versions (feast-dev#676)
* BigQuery connector migrates schema on feature set changes * Update docs, remove versions from protos * Remove versions from core, ApplyFeatureSet updates existing featuresets instead of advancing version * Remove versions from storage API and connectors * Remove versions from ingestion * Remove versions from serving * Remove versions from sdk, examples and end-to-end tests * Ignore versions if provided for backward compatibility * Regenerate golang protos, apply black * Rebase on master * Remove LAST_VERSION * Clean up job update logic * Use set comparison for entities during feature set updates, change FeatureSet.status to use enum directly * Fix tests and error messages * Move strip versions to transform before feature row validate step * Update documentation, clean up toProto method * Remove redundant status setting, correct misleading comments * Add end to end test for feature set updates, squash some bugs * Use count() instead of returning all rows
1 parent 2c5130d commit dfc81b9

File tree

98 files changed

+2511
-2734
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

98 files changed

+2511
-2734
lines changed

core/src/main/java/feast/core/dao/FeatureSetRepository.java

Lines changed: 8 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -25,25 +25,16 @@ public interface FeatureSetRepository extends JpaRepository<FeatureSet, String>
2525

2626
long count();
2727

28-
// Find single feature set by project, name, and version
29-
FeatureSet findFeatureSetByNameAndProject_NameAndVersion(
30-
String name, String project, Integer version);
28+
// Find single feature set by project and name
29+
FeatureSet findFeatureSetByNameAndProject_Name(String name, String project);
3130

32-
// Find single latest version of a feature set by project and name (LIKE)
33-
FeatureSet findFirstFeatureSetByNameLikeAndProject_NameOrderByVersionDesc(
34-
String name, String project);
31+
// find all feature sets and order by name
32+
List<FeatureSet> findAllByOrderByNameAsc();
3533

36-
// find all feature sets and order by name and version
37-
List<FeatureSet> findAllByOrderByNameAscVersionAsc();
34+
// find all feature sets matching the given name pattern with a specific project.
35+
List<FeatureSet> findAllByNameLikeAndProject_NameOrderByNameAsc(String name, String project_name);
3836

39-
// find all feature sets within a project and order by name and version
40-
List<FeatureSet> findAllByProject_NameOrderByNameAscVersionAsc(String project_name);
41-
42-
// find all versions of feature sets matching the given name pattern with a specific project.
43-
List<FeatureSet> findAllByNameLikeAndProject_NameOrderByNameAscVersionAsc(
44-
String name, String project_name);
45-
46-
// find all versions of feature sets matching the given name pattern and project pattern
47-
List<FeatureSet> findAllByNameLikeAndProject_NameLikeOrderByNameAscVersionAsc(
37+
// find all feature sets matching the given name pattern and project pattern
38+
List<FeatureSet> findAllByNameLikeAndProject_NameLikeOrderByNameAsc(
4839
String name, String project_name);
4940
}

core/src/main/java/feast/core/job/JobUpdateTask.java

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package feast.core.job;
1818

1919
import com.google.common.collect.Sets;
20+
import feast.core.FeatureSetProto.FeatureSetStatus;
2021
import feast.core.log.Action;
2122
import feast.core.log.AuditLogger;
2223
import feast.core.log.Resource;
@@ -28,7 +29,6 @@
2829
import java.time.Instant;
2930
import java.util.List;
3031
import java.util.Optional;
31-
import java.util.Set;
3232
import java.util.concurrent.Callable;
3333
import java.util.concurrent.ExecutionException;
3434
import java.util.concurrent.ExecutorService;
@@ -84,7 +84,7 @@ public Job call() {
8484
} else {
8585
Job job = currentJob.get();
8686

87-
if (featureSetsChangedFor(job)) {
87+
if (requiresUpdate(job)) {
8888
submittedJob = executorService.submit(() -> updateJob(job));
8989
} else {
9090
return updateStatus(job);
@@ -101,11 +101,19 @@ public Job call() {
101101
}
102102
}
103103

104-
boolean featureSetsChangedFor(Job job) {
105-
Set<FeatureSet> existingFeatureSetsPopulatedByJob = Sets.newHashSet(job.getFeatureSets());
106-
Set<FeatureSet> newFeatureSetsPopulatedByJob = Sets.newHashSet(featureSets);
104+
boolean requiresUpdate(Job job) {
105+
// If set of feature sets has changed
106+
if (!Sets.newHashSet(featureSets).equals(Sets.newHashSet(job.getFeatureSets()))) {
107+
return true;
108+
}
107109

108-
return !newFeatureSetsPopulatedByJob.equals(existingFeatureSetsPopulatedByJob);
110+
// If any existing feature set populated by the job has its status as pending
111+
for (FeatureSet featureSet : job.getFeatureSets()) {
112+
if (featureSet.getStatus().equals(FeatureSetStatus.STATUS_PENDING)) {
113+
return true;
114+
}
115+
}
116+
return false;
109117
}
110118

111119
private Job createJob() {

core/src/main/java/feast/core/job/direct/DirectRunnerJobManager.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ public Job startJob(Job job) {
7979
featureSetProtos.add(featureSet.toProto());
8080
}
8181
ImportOptions pipelineOptions =
82-
getPipelineOptions(featureSetProtos, job.getStore().toProto());
82+
getPipelineOptions(job.getId(), featureSetProtos, job.getStore().toProto());
8383
PipelineResult pipelineResult = runPipeline(pipelineOptions);
8484
DirectJob directJob = new DirectJob(job.getId(), pipelineResult);
8585
jobs.add(directJob);
@@ -93,14 +93,16 @@ public Job startJob(Job job) {
9393
}
9494

9595
private ImportOptions getPipelineOptions(
96-
List<FeatureSetProto.FeatureSet> featureSets, StoreProto.Store sink) throws IOException {
96+
String jobName, List<FeatureSetProto.FeatureSet> featureSets, StoreProto.Store sink)
97+
throws IOException {
9798
String[] args = TypeConversion.convertMapToArgs(defaultOptions);
9899
ImportOptions pipelineOptions = PipelineOptionsFactory.fromArgs(args).as(ImportOptions.class);
99100

100101
OptionCompressor<List<FeatureSetProto.FeatureSet>> featureSetJsonCompressor =
101102
new BZip2Compressor<>(new FeatureSetJsonByteConverter());
102103

103104
pipelineOptions.setFeatureSetJson(featureSetJsonCompressor.compress(featureSets));
105+
pipelineOptions.setJobName(jobName);
104106
pipelineOptions.setStoreJson(Collections.singletonList(JsonFormat.printer().print(sink)));
105107
pipelineOptions.setRunner(DirectRunner.class);
106108
pipelineOptions.setProject(""); // set to default value to satisfy validation

core/src/main/java/feast/core/model/Entity.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,10 @@ public static Entity fromProto(EntitySpec entitySpec) {
5454
return entity;
5555
}
5656

57+
public EntitySpec toProto() {
58+
return EntitySpec.newBuilder().setName(name).setValueType(ValueType.Enum.valueOf(type)).build();
59+
}
60+
5761
@Override
5862
public boolean equals(Object o) {
5963
if (this == o) {

core/src/main/java/feast/core/model/Feature.java

Lines changed: 104 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@
1616
*/
1717
package feast.core.model;
1818

19+
import com.google.protobuf.InvalidProtocolBufferException;
1920
import feast.core.FeatureSetProto.FeatureSpec;
21+
import feast.core.FeatureSetProto.FeatureSpec.Builder;
2022
import feast.core.util.TypeConversion;
2123
import feast.types.ValueProto.ValueType;
2224
import java.util.Arrays;
@@ -26,6 +28,7 @@
2628
import javax.persistence.Entity;
2729
import lombok.Getter;
2830
import lombok.Setter;
31+
import org.tensorflow.metadata.v0.*;
2932

3033
/**
3134
* Feature belonging to a featureset. Contains name, type as well as domain metadata about the
@@ -79,6 +82,9 @@ public class Feature {
7982
private byte[] timeOfDayDomain;
8083

8184
public Feature() {}
85+
// Whether this feature has been archived. A archived feature cannot be
86+
// retrieved from or written to.
87+
private boolean archived = false;
8288

8389
private Feature(String name, ValueType.Enum type) {
8490
this.setName(name);
@@ -88,70 +94,148 @@ private Feature(String name, ValueType.Enum type) {
8894
public static Feature fromProto(FeatureSpec featureSpec) {
8995
Feature feature = new Feature(featureSpec.getName(), featureSpec.getValueType());
9096
feature.labels = TypeConversion.convertMapToJsonString(featureSpec.getLabelsMap());
97+
feature.updateSchema(featureSpec);
98+
return feature;
99+
}
100+
101+
public FeatureSpec toProto() throws InvalidProtocolBufferException {
102+
Builder featureSpecBuilder =
103+
FeatureSpec.newBuilder().setName(getName()).setValueType(ValueType.Enum.valueOf(getType()));
104+
105+
if (getPresence() != null) {
106+
featureSpecBuilder.setPresence(FeaturePresence.parseFrom(getPresence()));
107+
} else if (getGroupPresence() != null) {
108+
featureSpecBuilder.setGroupPresence(FeaturePresenceWithinGroup.parseFrom(getGroupPresence()));
109+
}
91110

111+
if (getShape() != null) {
112+
featureSpecBuilder.setShape(FixedShape.parseFrom(getShape()));
113+
} else if (getValueCount() != null) {
114+
featureSpecBuilder.setValueCount(ValueCount.parseFrom(getValueCount()));
115+
}
116+
117+
if (getDomain() != null) {
118+
featureSpecBuilder.setDomain(getDomain());
119+
} else if (getIntDomain() != null) {
120+
featureSpecBuilder.setIntDomain(IntDomain.parseFrom(getIntDomain()));
121+
} else if (getFloatDomain() != null) {
122+
featureSpecBuilder.setFloatDomain(FloatDomain.parseFrom(getFloatDomain()));
123+
} else if (getStringDomain() != null) {
124+
featureSpecBuilder.setStringDomain(StringDomain.parseFrom(getStringDomain()));
125+
} else if (getBoolDomain() != null) {
126+
featureSpecBuilder.setBoolDomain(BoolDomain.parseFrom(getBoolDomain()));
127+
} else if (getStructDomain() != null) {
128+
featureSpecBuilder.setStructDomain(StructDomain.parseFrom(getStructDomain()));
129+
} else if (getNaturalLanguageDomain() != null) {
130+
featureSpecBuilder.setNaturalLanguageDomain(
131+
NaturalLanguageDomain.parseFrom(getNaturalLanguageDomain()));
132+
} else if (getImageDomain() != null) {
133+
featureSpecBuilder.setImageDomain(ImageDomain.parseFrom(getImageDomain()));
134+
} else if (getMidDomain() != null) {
135+
featureSpecBuilder.setMidDomain(MIDDomain.parseFrom(getMidDomain()));
136+
} else if (getUrlDomain() != null) {
137+
featureSpecBuilder.setUrlDomain(URLDomain.parseFrom(getUrlDomain()));
138+
} else if (getTimeDomain() != null) {
139+
featureSpecBuilder.setTimeDomain(TimeDomain.parseFrom(getTimeDomain()));
140+
} else if (getTimeOfDayDomain() != null) {
141+
featureSpecBuilder.setTimeOfDayDomain(TimeOfDayDomain.parseFrom(getTimeOfDayDomain()));
142+
}
143+
144+
if (getLabels() != null) {
145+
featureSpecBuilder.putAllLabels(getLabels());
146+
}
147+
return featureSpecBuilder.build();
148+
}
149+
150+
private void updateSchema(FeatureSpec featureSpec) {
92151
switch (featureSpec.getPresenceConstraintsCase()) {
93152
case PRESENCE:
94-
feature.setPresence(featureSpec.getPresence().toByteArray());
153+
setPresence(featureSpec.getPresence().toByteArray());
95154
break;
96155
case GROUP_PRESENCE:
97-
feature.setGroupPresence(featureSpec.getGroupPresence().toByteArray());
156+
setGroupPresence(featureSpec.getGroupPresence().toByteArray());
98157
break;
99158
case PRESENCECONSTRAINTS_NOT_SET:
100159
break;
101160
}
102161

103162
switch (featureSpec.getShapeTypeCase()) {
104163
case SHAPE:
105-
feature.setShape(featureSpec.getShape().toByteArray());
164+
setShape(featureSpec.getShape().toByteArray());
106165
break;
107166
case VALUE_COUNT:
108-
feature.setValueCount(featureSpec.getValueCount().toByteArray());
167+
setValueCount(featureSpec.getValueCount().toByteArray());
109168
break;
110169
case SHAPETYPE_NOT_SET:
111170
break;
112171
}
113172

114173
switch (featureSpec.getDomainInfoCase()) {
115174
case DOMAIN:
116-
feature.setDomain(featureSpec.getDomain());
175+
setDomain(featureSpec.getDomain());
117176
break;
118177
case INT_DOMAIN:
119-
feature.setIntDomain(featureSpec.getIntDomain().toByteArray());
178+
setIntDomain(featureSpec.getIntDomain().toByteArray());
120179
break;
121180
case FLOAT_DOMAIN:
122-
feature.setFloatDomain(featureSpec.getFloatDomain().toByteArray());
181+
setFloatDomain(featureSpec.getFloatDomain().toByteArray());
123182
break;
124183
case STRING_DOMAIN:
125-
feature.setStringDomain(featureSpec.getStringDomain().toByteArray());
184+
setStringDomain(featureSpec.getStringDomain().toByteArray());
126185
break;
127186
case BOOL_DOMAIN:
128-
feature.setBoolDomain(featureSpec.getBoolDomain().toByteArray());
187+
setBoolDomain(featureSpec.getBoolDomain().toByteArray());
129188
break;
130189
case STRUCT_DOMAIN:
131-
feature.setStructDomain(featureSpec.getStructDomain().toByteArray());
190+
setStructDomain(featureSpec.getStructDomain().toByteArray());
132191
break;
133192
case NATURAL_LANGUAGE_DOMAIN:
134-
feature.setNaturalLanguageDomain(featureSpec.getNaturalLanguageDomain().toByteArray());
193+
setNaturalLanguageDomain(featureSpec.getNaturalLanguageDomain().toByteArray());
135194
break;
136195
case IMAGE_DOMAIN:
137-
feature.setImageDomain(featureSpec.getImageDomain().toByteArray());
196+
setImageDomain(featureSpec.getImageDomain().toByteArray());
138197
break;
139198
case MID_DOMAIN:
140-
feature.setMidDomain(featureSpec.getMidDomain().toByteArray());
199+
setMidDomain(featureSpec.getMidDomain().toByteArray());
141200
break;
142201
case URL_DOMAIN:
143-
feature.setUrlDomain(featureSpec.getUrlDomain().toByteArray());
202+
setUrlDomain(featureSpec.getUrlDomain().toByteArray());
144203
break;
145204
case TIME_DOMAIN:
146-
feature.setTimeDomain(featureSpec.getTimeDomain().toByteArray());
205+
setTimeDomain(featureSpec.getTimeDomain().toByteArray());
147206
break;
148207
case TIME_OF_DAY_DOMAIN:
149-
feature.setTimeOfDayDomain(featureSpec.getTimeOfDayDomain().toByteArray());
208+
setTimeOfDayDomain(featureSpec.getTimeOfDayDomain().toByteArray());
150209
break;
151210
case DOMAININFO_NOT_SET:
152211
break;
153212
}
154-
return feature;
213+
}
214+
215+
/** Archive this feature. */
216+
public void archive() {
217+
this.archived = true;
218+
}
219+
220+
/**
221+
* Update the feature object with a valid feature spec. Only schema changes are allowed.
222+
*
223+
* @param featureSpec {@link FeatureSpec} containing schema changes.
224+
*/
225+
public void updateFromProto(FeatureSpec featureSpec) {
226+
if (isArchived()) {
227+
throw new IllegalArgumentException(
228+
String.format(
229+
"You are attempting to create a feature %s that was previously archived. This isn't allowed. Please create a new feature with a different name.",
230+
featureSpec.getName()));
231+
}
232+
if (ValueType.Enum.valueOf(type) != featureSpec.getValueType()) {
233+
throw new IllegalArgumentException(
234+
String.format(
235+
"You are attempting to change the type of feature %s from %s to %s. This isn't allowed. Please create a new feature.",
236+
featureSpec.getName(), type, featureSpec.getValueType()));
237+
}
238+
updateSchema(featureSpec);
155239
}
156240

157241
public Map<String, String> getLabels() {
@@ -167,7 +251,9 @@ public boolean equals(Object o) {
167251
return false;
168252
}
169253
Feature feature = (Feature) o;
170-
return Objects.equals(getName(), feature.getName())
254+
return getName().equals(feature.getName())
255+
&& getType().equals(feature.getType())
256+
&& isArchived() == (feature.isArchived())
171257
&& Objects.equals(getLabels(), feature.getLabels())
172258
&& Arrays.equals(getPresence(), feature.getPresence())
173259
&& Arrays.equals(getGroupPresence(), feature.getGroupPresence())

0 commit comments

Comments
 (0)