Skip to content

Commit 8ec1f8a

Browse files
woopfeast-ci-bot
authored andcommitted
Refactored Core API: ListFeatureSets, ListStore, and GetFeatureSet (feast-dev#309)
* Refactored GetFeatureSets and GetStores to ListFeatureSets, ListStores, and GetFeatureSet * Updated and implemented in Core * Updated and implemented in Python SDK * Updated serving * Added basic test for wildcard feature list filter * Remove unnecessary asserts from e2e tests * Add handling for wildcard subscriptions in ingestion * * Fix handling of wildcards in core and update prow * Fix Python Kafka producer bug * Fix comparison between feature sets failing * Fix e2e tests failing due to recreating dataframes * Replace IllegalArgumentException with gRPC RuntimeException Co-Authored-By: David Heryanto <david.heryanto@hotmail.com> * Remove confluent-kafka and replace with kafka-python
1 parent b1657e2 commit 8ec1f8a

34 files changed

Lines changed: 1053 additions & 744 deletions

File tree

.prow/scripts/test-end-to-end.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ redis_config:
140140
host: localhost
141141
port: 6379
142142
subscriptions:
143-
- name: .*
143+
- name: "*"
144144
version: ">0"
145145
EOF
146146

CONTRIBUTING.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ store {
6464
name: "SERVING"
6565
type: REDIS
6666
subscriptions {
67-
name: ".*"
67+
name: "*"
6868
version: ">0"
6969
}
7070
redis_config {
@@ -76,7 +76,7 @@ store {
7676
name: "WAREHOUSE"
7777
type: BIGQUERY
7878
subscriptions {
79-
name: ".*"
79+
name: "*"
8080
version: ">0"
8181
}
8282
bigquery_config {

core/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,5 +28,5 @@ If you have [grpc_cli](https://github.com/grpc/grpc/blob/master/doc/command_line
2828
```
2929
grpc_cli ls localhost:6565
3030
grpc_cli call localhost:6565 GetFeastCoreVersion ""
31-
grpc_cli call localhost:6565 GetStores ""
31+
grpc_cli call localhost:6565 ListStores ""
3232
```

core/src/main/java/feast/core/dao/FeatureSetRepository.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,5 +12,5 @@ public interface FeatureSetRepository extends JpaRepository<FeatureSet, String>
1212

1313
// find all versions of featureSets with names matching the regex
1414
@Query(nativeQuery=true, value="SELECT * FROM feature_sets WHERE name LIKE ?1")
15-
List<FeatureSet> findByNameRegex(String regex);
15+
List<FeatureSet> findByNameWithWildcard(String name);
1616
}

core/src/main/java/feast/core/grpc/CoreServiceImpl.java

Lines changed: 40 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,13 @@
2424
import feast.core.CoreServiceProto.ApplyFeatureSetResponse;
2525
import feast.core.CoreServiceProto.GetFeastCoreVersionRequest;
2626
import feast.core.CoreServiceProto.GetFeastCoreVersionResponse;
27-
import feast.core.CoreServiceProto.GetFeatureSetsRequest;
28-
import feast.core.CoreServiceProto.GetFeatureSetsResponse;
29-
import feast.core.CoreServiceProto.GetStoresRequest;
30-
import feast.core.CoreServiceProto.GetStoresRequest.Filter;
31-
import feast.core.CoreServiceProto.GetStoresResponse;
27+
import feast.core.CoreServiceProto.GetFeatureSetRequest;
28+
import feast.core.CoreServiceProto.GetFeatureSetResponse;
29+
import feast.core.CoreServiceProto.ListFeatureSetsRequest;
30+
import feast.core.CoreServiceProto.ListFeatureSetsResponse;
31+
import feast.core.CoreServiceProto.ListStoresRequest;
32+
import feast.core.CoreServiceProto.ListStoresRequest.Filter;
33+
import feast.core.CoreServiceProto.ListStoresResponse;
3234
import feast.core.CoreServiceProto.UpdateStoreRequest;
3335
import feast.core.CoreServiceProto.UpdateStoreResponse;
3436
import feast.core.CoreServiceProto.UpdateStoreResponse.Status;
@@ -37,11 +39,9 @@
3739
import feast.core.StoreProto.Store;
3840
import feast.core.StoreProto.Store.Subscription;
3941
import feast.core.exception.RetrievalException;
40-
import feast.core.model.Source;
4142
import feast.core.service.JobCoordinatorService;
4243
import feast.core.service.SpecService;
4344
import io.grpc.stub.StreamObserver;
44-
import java.util.ArrayList;
4545
import java.util.HashSet;
4646
import java.util.List;
4747
import java.util.Set;
@@ -73,28 +73,42 @@ public void getFeastCoreVersion(
7373

7474
@Override
7575
@Transactional
76-
public void getFeatureSets(
77-
GetFeatureSetsRequest request, StreamObserver<GetFeatureSetsResponse> responseObserver) {
76+
public void getFeatureSet(
77+
GetFeatureSetRequest request, StreamObserver<GetFeatureSetResponse> responseObserver) {
7878
try {
79-
GetFeatureSetsResponse response = specService.getFeatureSets(request.getFilter());
79+
GetFeatureSetResponse response = specService.getFeatureSet(request);
8080
responseObserver.onNext(response);
8181
responseObserver.onCompleted();
8282
} catch (RetrievalException | InvalidProtocolBufferException e) {
83-
log.error("Exception has occurred in GetFeatureSets method: ", e);
83+
log.error("Exception has occurred in GetFeatureSet method: ", e);
8484
responseObserver.onError(e);
8585
}
8686
}
8787

8888
@Override
8989
@Transactional
90-
public void getStores(
91-
GetStoresRequest request, StreamObserver<GetStoresResponse> responseObserver) {
90+
public void listFeatureSets(
91+
ListFeatureSetsRequest request, StreamObserver<ListFeatureSetsResponse> responseObserver) {
9292
try {
93-
GetStoresResponse response = specService.getStores(request.getFilter());
93+
ListFeatureSetsResponse response = specService.listFeatureSets(request.getFilter());
94+
responseObserver.onNext(response);
95+
responseObserver.onCompleted();
96+
} catch (RetrievalException | InvalidProtocolBufferException e) {
97+
log.error("Exception has occurred in ListFeatureSet method: ", e);
98+
responseObserver.onError(e);
99+
}
100+
}
101+
102+
@Override
103+
@Transactional
104+
public void listStores(
105+
ListStoresRequest request, StreamObserver<ListStoresResponse> responseObserver) {
106+
try {
107+
ListStoresResponse response = specService.listStores(request.getFilter());
94108
responseObserver.onNext(response);
95109
responseObserver.onCompleted();
96110
} catch (RetrievalException e) {
97-
log.error("Exception has occurred in GetStores method: ", e);
111+
log.error("Exception has occurred in ListStores method: ", e);
98112
responseObserver.onError(e);
99113
}
100114
}
@@ -106,22 +120,27 @@ public void applyFeatureSet(
106120
try {
107121
ApplyFeatureSetResponse response = specService.applyFeatureSet(request.getFeatureSet());
108122
String featureSetName = response.getFeatureSet().getName();
109-
GetStoresResponse stores = specService.getStores(Filter.newBuilder().build());
123+
ListStoresResponse stores = specService.listStores(Filter.newBuilder().build());
110124
for (Store store : stores.getStoreList()) {
111125
List<Subscription> relevantSubscriptions =
112126
store.getSubscriptionsList().stream()
113127
.filter(
114128
sub -> {
115-
Pattern p = Pattern.compile(sub.getName());
129+
String subString = sub.getName();
130+
if (!subString.contains(".*"))
131+
{
132+
subString = subString.replace("*", ".*");
133+
}
134+
Pattern p = Pattern.compile(subString);
116135
return p.matcher(featureSetName).matches();
117136
})
118137
.collect(Collectors.toList());
119138
Set<FeatureSetSpec> featureSetSpecs = new HashSet<>();
120139
for (Subscription subscription : relevantSubscriptions) {
121140
featureSetSpecs.addAll(
122141
specService
123-
.getFeatureSets(
124-
GetFeatureSetsRequest.Filter.newBuilder()
142+
.listFeatureSets(
143+
ListFeatureSetsRequest.Filter.newBuilder()
125144
.setFeatureSetName(subscription.getName())
126145
.setFeatureSetVersion(subscription.getVersion())
127146
.build())
@@ -157,8 +176,8 @@ public void updateStore(UpdateStoreRequest request,
157176
Store store = response.getStore();
158177
for (Subscription subscription : store.getSubscriptionsList()) {
159178
featureSetSpecs.addAll(
160-
specService.getFeatureSets(
161-
GetFeatureSetsRequest.Filter.newBuilder()
179+
specService.listFeatureSets(
180+
ListFeatureSetsRequest.Filter.newBuilder()
162181
.setFeatureSetName(subscription.getName())
163182
.setFeatureSetVersion(subscription.getVersion())
164183
.build())

core/src/main/java/feast/core/service/SpecService.java

Lines changed: 76 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,20 @@
1717

1818
package feast.core.service;
1919

20+
import static feast.core.validators.Matchers.checkValidCharacters;
21+
import static feast.core.validators.Matchers.checkValidFeatureSetFilterName;
22+
2023
import com.google.common.collect.Ordering;
2124
import com.google.protobuf.InvalidProtocolBufferException;
2225
import feast.core.CoreServiceProto.ApplyFeatureSetResponse;
2326
import feast.core.CoreServiceProto.ApplyFeatureSetResponse.Status;
24-
import feast.core.CoreServiceProto.GetFeatureSetsRequest;
25-
import feast.core.CoreServiceProto.GetFeatureSetsResponse;
26-
import feast.core.CoreServiceProto.GetStoresRequest;
27-
import feast.core.CoreServiceProto.GetStoresResponse;
28-
import feast.core.CoreServiceProto.GetStoresResponse.Builder;
27+
import feast.core.CoreServiceProto.GetFeatureSetRequest;
28+
import feast.core.CoreServiceProto.GetFeatureSetResponse;
29+
import feast.core.CoreServiceProto.ListFeatureSetsRequest;
30+
import feast.core.CoreServiceProto.ListFeatureSetsResponse;
31+
import feast.core.CoreServiceProto.ListStoresRequest;
32+
import feast.core.CoreServiceProto.ListStoresResponse;
33+
import feast.core.CoreServiceProto.ListStoresResponse.Builder;
2934
import feast.core.CoreServiceProto.UpdateStoreRequest;
3035
import feast.core.CoreServiceProto.UpdateStoreResponse;
3136
import feast.core.FeatureSetProto.FeatureSetSpec;
@@ -72,6 +77,61 @@ public SpecService(
7277
this.defaultSource = defaultSource;
7378
}
7479

80+
/**
81+
* Get a feature set matching the feature name and version provided in the filter. The name
82+
* is required. If the version is provided then it will be used for the lookup. If the version
83+
* is omitted then the latest version will be returned.
84+
*
85+
* @param GetFeatureSetRequest containing the name and version of the feature set
86+
* @return GetFeatureSetResponse containing a single feature set
87+
*/
88+
public GetFeatureSetResponse getFeatureSet(GetFeatureSetRequest request)
89+
throws InvalidProtocolBufferException {
90+
91+
// Validate input arguments
92+
checkValidCharacters(request.getName(), "featureSetName");
93+
if (request.getName().isEmpty()) {
94+
throw io.grpc.Status.INVALID_ARGUMENT
95+
.withDescription("No feature set name provided")
96+
.asRuntimeException();
97+
}
98+
if (request.getVersion() < 0){
99+
throw io.grpc.Status.INVALID_ARGUMENT
100+
.withDescription("Version number cannot be less than 0")
101+
.asRuntimeException();
102+
}
103+
104+
// Find a list of feature sets with the requested name
105+
List<FeatureSet> featureSets = featureSetRepository.findByNameWithWildcard(request.getName());
106+
107+
// Filter the list based on version
108+
if (request.getVersion() == 0){
109+
// Version is not set, filter list to latest version
110+
featureSets = Ordering.natural().reverse()
111+
.sortedCopy(featureSets).subList(0, featureSets.size() == 0 ? 0 : 1);
112+
} else if(request.getVersion() > 0) {
113+
// Version is set, find specific version
114+
featureSets = featureSets.stream()
115+
.filter(fs -> request.getVersion() == fs.getVersion()).collect(Collectors.toList());
116+
}
117+
118+
// Validate remaining items
119+
if (featureSets.size() == 0){
120+
throw io.grpc.Status.NOT_FOUND
121+
.withDescription("Feature set could not be found")
122+
.asRuntimeException();
123+
}
124+
if (featureSets.size() > 1){
125+
throw io.grpc.Status.INTERNAL
126+
.withDescription(String.format("Multiple feature sets found with the name %s and "
127+
+ "version %s", request.getName(), request.getVersion()))
128+
.asRuntimeException();
129+
}
130+
131+
// Only a single item in list, return successfully
132+
return GetFeatureSetResponse.newBuilder().setFeatureSet(featureSets.get(0).toProto()).build();
133+
}
134+
75135
/**
76136
* Get featureSets matching the feature name and version provided in the filter. If the feature
77137
* name is not provided, the method will return all featureSets currently registered to Feast.
@@ -84,25 +144,21 @@ public SpecService(
84144
* comparator (<, <=, >, etc) and a version number, e.g. 10, <10, >=1
85145
*
86146
* @param filter filter containing the desired featureSet name and version filter
87-
* @return GetFeatureSetsResponse with list of featureSets found matching the filter
147+
* @return ListFeatureSetsResponse with list of featureSets found matching the filter
88148
*/
89-
public GetFeatureSetsResponse getFeatureSets(GetFeatureSetsRequest.Filter filter)
149+
public ListFeatureSetsResponse listFeatureSets(ListFeatureSetsRequest.Filter filter)
90150
throws InvalidProtocolBufferException {
91151
String name = filter.getFeatureSetName();
152+
checkValidFeatureSetFilterName(name, "featureSetName");
92153
List<FeatureSet> featureSets;
93154
if (name.equals("")) {
94155
featureSets = featureSetRepository.findAll();
95156
} else {
96-
featureSets = featureSetRepository.findByNameRegex(name);
97-
if (filter.getFeatureSetVersion().equals("latest")) {
98-
featureSets = Ordering.natural().reverse()
99-
.sortedCopy(featureSets).subList(0, featureSets.size() == 0 ? 0 : 1);
100-
} else {
101-
featureSets = featureSets.stream().filter(getVersionFilter(filter.getFeatureSetVersion()))
102-
.collect(Collectors.toList());
103-
}
157+
featureSets = featureSetRepository.findByNameWithWildcard(name.replace('*', '%'));
158+
featureSets = featureSets.stream().filter(getVersionFilter(filter.getFeatureSetVersion()))
159+
.collect(Collectors.toList());
104160
}
105-
GetFeatureSetsResponse.Builder response = GetFeatureSetsResponse.newBuilder();
161+
ListFeatureSetsResponse.Builder response = ListFeatureSetsResponse.newBuilder();
106162
for (FeatureSet featureSet : featureSets) {
107163
response.addFeatureSets(featureSet.toProto());
108164
}
@@ -114,13 +170,13 @@ public GetFeatureSetsResponse getFeatureSets(GetFeatureSetsRequest.Filter filter
114170
* the method will return all stores currently registered to Feast.
115171
*
116172
* @param filter filter containing the desired store name
117-
* @return GetStoresResponse containing list of stores found matching the filter
173+
* @return ListStoresResponse containing list of stores found matching the filter
118174
*/
119-
public GetStoresResponse getStores(GetStoresRequest.Filter filter) {
175+
public ListStoresResponse listStores(ListStoresRequest.Filter filter) {
120176
try {
121177
String name = filter.getName();
122178
if (name.equals("")) {
123-
Builder responseBuilder = GetStoresResponse.newBuilder();
179+
Builder responseBuilder = ListStoresResponse.newBuilder();
124180
for (Store store : storeRepository.findAll()) {
125181
responseBuilder.addStore(store.toProto());
126182
}
@@ -129,7 +185,7 @@ public GetStoresResponse getStores(GetStoresRequest.Filter filter) {
129185
Store store = storeRepository.findById(name)
130186
.orElseThrow(() -> new RetrievalException(String.format("Store with name '%s' not found",
131187
name)));
132-
return GetStoresResponse.newBuilder()
188+
return ListStoresResponse.newBuilder()
133189
.addStore(store.toProto())
134190
.build();
135191
} catch (InvalidProtocolBufferException e) {

core/src/main/java/feast/core/validators/Matchers.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,14 @@
1717

1818
package feast.core.validators;
1919

20-
import com.google.common.base.Strings;
21-
2220
import java.util.regex.Pattern;
2321

2422
public class Matchers {
2523

2624
private static Pattern UPPER_SNAKE_CASE_REGEX = Pattern.compile("^[A-Z0-9]+(_[A-Z0-9]+)*$");
2725
private static Pattern LOWER_SNAKE_CASE_REGEX = Pattern.compile("^[a-z0-9]+(_[a-z0-9]+)*$");
2826
private static Pattern VALID_CHARACTERS_REGEX = Pattern.compile("^[a-zA-Z0-9\\-_]*$");
27+
private static Pattern VALID_CHARACTERS_FSET_FILTER_REGEX = Pattern.compile("^[a-zA-Z0-9\\-_*]*$");
2928

3029
private static String ERROR_MESSAGE_TEMPLATE = "invalid value for field %s: %s";
3130

@@ -61,4 +60,15 @@ public static void checkValidCharacters(String input, String fieldName)
6160
"argument must only contain alphanumeric characters, dashes and underscores."));
6261
}
6362
}
63+
64+
public static void checkValidFeatureSetFilterName(String input, String fieldName)
65+
throws IllegalArgumentException {
66+
if (!VALID_CHARACTERS_FSET_FILTER_REGEX.matcher(input).matches()) {
67+
throw new IllegalArgumentException(
68+
String.format(
69+
ERROR_MESSAGE_TEMPLATE,
70+
fieldName,
71+
"argument must only contain alphanumeric characters, dashes, underscores, or an asterisk."));
72+
}
73+
}
6474
}

0 commit comments

Comments
 (0)