Skip to content

Commit 772e39c

Browse files
author
zhilingc
committed
Merge branch 'zl/0.3-dev-storage-opts' into 0.3-dev
2 parents 055c9c6 + 0fca98e commit 772e39c

27 files changed

Lines changed: 640 additions & 537 deletions

core/src/main/java/feast/core/config/FeastProperties.java

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -11,21 +11,10 @@
1111
public class FeastProperties {
1212

1313
private String version;
14-
private StoreProperties store;
1514
private JobProperties jobs;
1615
private StreamProperties stream;
1716
private StatsdProperties statsd;
1817

19-
@Getter
20-
@Setter
21-
public static class StoreProperties {
22-
23-
private String servingType;
24-
private Map<String, String> servingOptions;
25-
private String warehouseType;
26-
private Map<String, String> warehouseOptions;
27-
}
28-
2918
@Getter
3019
@Setter
3120
public static class JobProperties {

core/src/main/java/feast/core/config/StoreConfig.java

Lines changed: 0 additions & 84 deletions
This file was deleted.

core/src/main/java/feast/core/grpc/CoreServiceImpl.java

Lines changed: 44 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -29,32 +29,38 @@
2929
import feast.core.CoreServiceProto.GetStoresRequest;
3030
import feast.core.CoreServiceProto.GetStoresRequest.Filter;
3131
import feast.core.CoreServiceProto.GetStoresResponse;
32+
import feast.core.CoreServiceProto.UpdateStoreRequest;
33+
import feast.core.CoreServiceProto.UpdateStoreResponse;
34+
import feast.core.CoreServiceProto.UpdateStoreResponse.Status;
3235
import feast.core.FeatureSetProto.FeatureSetSpec;
3336
import feast.core.StoreProto.Store;
3437
import feast.core.StoreProto.Store.Subscription;
3538
import feast.core.exception.RetrievalException;
3639
import feast.core.service.JobCoordinatorService;
3740
import feast.core.service.SpecService;
38-
import io.grpc.Status;
39-
import io.grpc.StatusRuntimeException;
4041
import io.grpc.stub.StreamObserver;
4142
import java.util.ArrayList;
43+
import java.util.HashSet;
4244
import java.util.List;
45+
import java.util.Set;
4346
import java.util.regex.Pattern;
4447
import java.util.stream.Collectors;
4548
import lombok.extern.slf4j.Slf4j;
4649
import org.lognet.springboot.grpc.GRpcService;
4750
import org.springframework.beans.factory.annotation.Autowired;
4851
import org.springframework.transaction.annotation.Transactional;
4952

50-
/** Implementation of the feast core GRPC service. */
53+
/**
54+
* Implementation of the feast core GRPC service.
55+
*/
5156
@Slf4j
5257
@GRpcService
5358
public class CoreServiceImpl extends CoreServiceImplBase {
5459

55-
@Autowired private SpecService specService;
56-
@Autowired private StatsDClient statsDClient;
57-
@Autowired private JobCoordinatorService jobCoordinatorService;
60+
@Autowired
61+
private SpecService specService;
62+
@Autowired
63+
private JobCoordinatorService jobCoordinatorService;
5864

5965
@Override
6066
public void getFeastCoreVersion(
@@ -72,7 +78,7 @@ public void getFeatureSets(
7278
responseObserver.onNext(response);
7379
responseObserver.onCompleted();
7480
} catch (RetrievalException | InvalidProtocolBufferException e) {
75-
responseObserver.onError(getRuntimeException(e));
81+
responseObserver.onError(e);
7682
}
7783
}
7884

@@ -85,7 +91,7 @@ public void getStores(
8591
responseObserver.onNext(response);
8692
responseObserver.onCompleted();
8793
} catch (RetrievalException e) {
88-
responseObserver.onError(getRuntimeException(e));
94+
responseObserver.onError(e);
8995
}
9096
}
9197

@@ -124,17 +130,39 @@ public void applyFeatureSet(
124130
responseObserver.onNext(response);
125131
responseObserver.onCompleted();
126132
} catch (Exception e) {
127-
responseObserver.onError(getRuntimeException(e));
133+
responseObserver.onError(e);
128134
}
129135
}
130136

131-
private StatusRuntimeException getRuntimeException(Exception e) {
132-
return new StatusRuntimeException(
133-
Status.fromCode(Status.Code.INTERNAL).withDescription(e.getMessage()).withCause(e));
134-
}
137+
@Override
138+
public void updateStore(UpdateStoreRequest request,
139+
StreamObserver<UpdateStoreResponse> responseObserver) {
140+
try {
141+
UpdateStoreResponse response = specService.updateStore(request);
142+
responseObserver.onNext(response);
143+
responseObserver.onCompleted();
135144

136-
private StatusRuntimeException getBadRequestException(Exception e) {
137-
return new StatusRuntimeException(
138-
Status.fromCode(Status.Code.OUT_OF_RANGE).withDescription(e.getMessage()).withCause(e));
145+
if (!response.getStatus().equals(Status.NO_CHANGE)) {
146+
Set<FeatureSetSpec> featureSetSpecs = new HashSet<>();
147+
Store store = response.getStore();
148+
for (Subscription subscription : store.getSubscriptionsList()) {
149+
featureSetSpecs.addAll(
150+
specService.getFeatureSets(
151+
GetFeatureSetsRequest.Filter.newBuilder()
152+
.setFeatureSetName(subscription.getName())
153+
.setFeatureSetVersion(subscription.getVersion())
154+
.build())
155+
.getFeatureSetsList()
156+
);
157+
}
158+
featureSetSpecs.stream()
159+
.collect(Collectors.groupingBy(FeatureSetSpec::getName))
160+
.entrySet()
161+
.stream()
162+
.forEach(kv -> jobCoordinatorService.startOrUpdateJob(kv.getValue(), store));
163+
}
164+
} catch (Exception e) {
165+
responseObserver.onError(e);
166+
}
139167
}
140168
}

core/src/main/java/feast/core/service/SpecService.java

Lines changed: 43 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,10 @@
2626
import feast.core.CoreServiceProto.GetStoresRequest;
2727
import feast.core.CoreServiceProto.GetStoresResponse;
2828
import feast.core.CoreServiceProto.GetStoresResponse.Builder;
29+
import feast.core.CoreServiceProto.UpdateStoreRequest;
30+
import feast.core.CoreServiceProto.UpdateStoreResponse;
2931
import feast.core.FeatureSetProto.FeatureSetSpec;
32+
import feast.core.StoreProto;
3033
import feast.core.dao.FeatureSetRepository;
3134
import feast.core.dao.StoreRepository;
3235
import feast.core.exception.RetrievalException;
@@ -134,7 +137,10 @@ public GetStoresResponse getStores(GetStoresRequest.Filter filter) {
134137
.addStore(store.toProto())
135138
.build();
136139
} catch (InvalidProtocolBufferException e) {
137-
throw new RetrievalException("Unable to retrieve stores", e);
140+
throw io.grpc.Status.NOT_FOUND
141+
.withDescription("Unable to retrieve stores")
142+
.withCause(e)
143+
.asRuntimeException();
138144
}
139145
}
140146

@@ -188,6 +194,34 @@ public ApplyFeatureSetResponse applyFeatureSet(FeatureSetSpec newFeatureSetSpec)
188194
.build();
189195
}
190196

197+
/**
198+
* UpdateStore updates the repository with the new given store.
199+
*
200+
* @param updateStoreRequest containing the new store definition
201+
* @return UpdateStoreResponse containing the new store definition
202+
* @throws InvalidProtocolBufferException
203+
*/
204+
public UpdateStoreResponse updateStore(UpdateStoreRequest updateStoreRequest)
205+
throws InvalidProtocolBufferException {
206+
StoreProto.Store newStoreProto = updateStoreRequest.getStore();
207+
Store existingStore = storeRepository.findById(newStoreProto.getName()).orElse(null);
208+
209+
// Do nothing if no change
210+
if (existingStore != null && existingStore.toProto().equals(newStoreProto)) {
211+
return UpdateStoreResponse.newBuilder()
212+
.setStatus(UpdateStoreResponse.Status.NO_CHANGE)
213+
.setStore(updateStoreRequest.getStore())
214+
.build();
215+
}
216+
217+
Store newStore = Store.fromProto(newStoreProto);
218+
storeRepository.save(newStore);
219+
return UpdateStoreResponse.newBuilder()
220+
.setStatus(UpdateStoreResponse.Status.UPDATED)
221+
.setStore(updateStoreRequest.getStore())
222+
.build();
223+
}
224+
191225
private Predicate<? super FeatureSet> getVersionFilter(String versionFilter) {
192226
if (versionFilter.equals("")) {
193227
return v -> true;
@@ -196,11 +230,12 @@ private Predicate<? super FeatureSet> getVersionFilter(String versionFilter) {
196230
match.find();
197231

198232
if (!match.matches()) {
199-
throw new RetrievalException(
200-
String.format(
233+
throw io.grpc.Status.INVALID_ARGUMENT
234+
.withDescription(String.format(
201235
"Invalid version string '%s' provided. Version string may either "
202236
+ "be a fixed version, e.g. 10, or contain a comparator, e.g. >10.",
203-
versionFilter));
237+
versionFilter))
238+
.asRuntimeException();
204239
}
205240

206241
int versionNumber = Integer.valueOf(match.group("version"));
@@ -217,11 +252,12 @@ private Predicate<? super FeatureSet> getVersionFilter(String versionFilter) {
217252
case "":
218253
return v -> v.getVersion() == versionNumber;
219254
default:
220-
throw new RetrievalException(
221-
String.format(
255+
throw io.grpc.Status.INVALID_ARGUMENT
256+
.withDescription(String.format(
222257
"Invalid comparator '%s' provided. Version string may either "
223258
+ "be a fixed version, e.g. 10, or contain a comparator, e.g. >10.",
224-
comparator));
259+
comparator))
260+
.asRuntimeException();
225261
}
226262
}
227263

core/src/main/resources/application.yml

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -24,29 +24,6 @@ grpc:
2424

2525
feast:
2626
# version: @project.version@
27-
store:
28-
# Serving store type. For more information on the available types
29-
# see Store.proto. To provision feast without a serving store,
30-
# leave this value blank.
31-
serving-type: REDIS
32-
# Serving store options. The necessary option values depend on the
33-
# store. For more information, see Store.proto.
34-
serving-options:
35-
host: localhost
36-
port: 6379
37-
# Store subscriptions, as a comma delimited string of featureName:versions
38-
# to populate this store with. Regex is supported for featureName, and
39-
# expressions for version.
40-
subscriptions: ".*:>0"
41-
# Warehouse store type. For more information on the available types
42-
# see Store.proto. To provision feast without a warehouse store,
43-
# leave this value blank.
44-
warehouse-type: BIGQUERY
45-
warehouse-options:
46-
projectId: my-google-project-id
47-
datasetId: my-bigquery-dataset-id
48-
# Same as feast.store.serving-options.subscriptions
49-
subscriptions: ".*:>0"
5027
jobs:
5128
# Runner type for feature population jobs. Currently supported runner types are
5229
# DirectRunner and DataflowRunner.

0 commit comments

Comments
 (0)