Skip to content

Commit 8ff9cde

Browse files
author
zhilingc
committed
Add flag for status change in store update, add tests
1 parent 9232903 commit 8ff9cde

5 files changed

Lines changed: 70 additions & 43 deletions

File tree

core/src/main/java/feast/core/grpc/CoreServiceImpl.java

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import feast.core.CoreServiceProto.GetStoresResponse;
3232
import feast.core.CoreServiceProto.UpdateStoreRequest;
3333
import feast.core.CoreServiceProto.UpdateStoreResponse;
34+
import feast.core.CoreServiceProto.UpdateStoreResponse.Status;
3435
import feast.core.FeatureSetProto.FeatureSetSpec;
3536
import feast.core.StoreProto.Store;
3637
import feast.core.StoreProto.Store.Subscription;
@@ -59,8 +60,6 @@ public class CoreServiceImpl extends CoreServiceImplBase {
5960
@Autowired
6061
private SpecService specService;
6162
@Autowired
62-
private StatsDClient statsDClient;
63-
@Autowired
6463
private JobCoordinatorService jobCoordinatorService;
6564

6665
@Override
@@ -143,23 +142,25 @@ public void updateStore(UpdateStoreRequest request,
143142
responseObserver.onNext(response);
144143
responseObserver.onCompleted();
145144

146-
Set<FeatureSetSpec> featureSetSpecs = new HashSet<>();
147-
Store store = response.getStore();
148-
for (Subscription subscription : store.getSubscriptionsList()) {
149-
featureSetSpecs.addAll(
150-
specService.getFeatureSets(
151-
GetFeatureSetsRequest.Filter.newBuilder()
152-
.setFeatureSetName(subscription.getName())
153-
.setFeatureSetVersion(subscription.getVersion())
154-
.build())
155-
.getFeatureSetsList()
156-
);
145+
if (!response.getStatus().equals(Status.NO_CHANGE)) {
146+
Set<FeatureSetSpec> featureSetSpecs = new HashSet<>();
147+
Store store = response.getStore();
148+
for (Subscription subscription : store.getSubscriptionsList()) {
149+
featureSetSpecs.addAll(
150+
specService.getFeatureSets(
151+
GetFeatureSetsRequest.Filter.newBuilder()
152+
.setFeatureSetName(subscription.getName())
153+
.setFeatureSetVersion(subscription.getVersion())
154+
.build())
155+
.getFeatureSetsList()
156+
);
157+
}
158+
featureSetSpecs.stream()
159+
.collect(Collectors.groupingBy(FeatureSetSpec::getName))
160+
.entrySet()
161+
.stream()
162+
.forEach(kv -> jobCoordinatorService.startOrUpdateJob(kv.getValue(), store));
157163
}
158-
featureSetSpecs.stream()
159-
.collect(Collectors.groupingBy(FeatureSetSpec::getName))
160-
.entrySet()
161-
.stream()
162-
.forEach(kv -> jobCoordinatorService.startOrUpdateJob(kv.getValue(), store));
163164
} catch (Exception e) {
164165
responseObserver.onError(e);
165166
}

core/src/main/java/feast/core/service/SpecService.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,13 +209,15 @@ public UpdateStoreResponse updateStore(UpdateStoreRequest updateStoreRequest)
209209
// Do nothing if no change
210210
if (existingStore != null && existingStore.toProto().equals(newStoreProto)) {
211211
return UpdateStoreResponse.newBuilder()
212+
.setStatus(UpdateStoreResponse.Status.NO_CHANGE)
212213
.setStore(updateStoreRequest.getStore())
213214
.build();
214215
}
215216

216217
Store newStore = Store.fromProto(newStoreProto);
217218
storeRepository.save(newStore);
218219
return UpdateStoreResponse.newBuilder()
220+
.setStatus(UpdateStoreResponse.Status.UPDATED)
219221
.setStore(updateStoreRequest.getStore())
220222
.build();
221223
}

core/src/main/resources/application.yml

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -24,29 +24,6 @@ grpc:
2424

2525
feast:
2626
# version: @project.version@
27-
store:
28-
# Serving store type. For more information on the available types
29-
# see Store.proto. To provision feast without a serving store,
30-
# leave this value blank.
31-
serving-type: REDIS
32-
# Serving store options. The necessary option values depend on the
33-
# store. For more information, see Store.proto.
34-
serving-options:
35-
host: localhost
36-
port: 6379
37-
# Store subscriptions, as a comma delimited string of featureName:versions
38-
# to populate this store with. Regex is supported for featureName, and
39-
# expressions for version.
40-
subscriptions: ".*:>0"
41-
# Warehouse store type. For more information on the available types
42-
# see Store.proto. To provision feast without a warehouse store,
43-
# leave this value blank.
44-
warehouse-type: BIGQUERY
45-
warehouse-options:
46-
projectId: my-google-project-id
47-
datasetId: my-bigquery-dataset-id
48-
# Same as feast.store.serving-options.subscriptions
49-
subscriptions: ".*:>0"
5027
jobs:
5128
# Runner type for feature population jobs. Currently supported runner types are
5229
# DirectRunner and DataflowRunner.

core/src/test/java/feast/core/service/SpecServiceTest.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,16 @@
3232
import feast.core.CoreServiceProto.GetFeatureSetsResponse;
3333
import feast.core.CoreServiceProto.GetStoresRequest;
3434
import feast.core.CoreServiceProto.GetStoresResponse;
35+
import feast.core.CoreServiceProto.UpdateStoreRequest;
36+
import feast.core.CoreServiceProto.UpdateStoreResponse;
3537
import feast.core.FeatureSetProto.FeatureSetSpec;
3638
import feast.core.FeatureSetProto.FeatureSpec;
3739
import feast.core.SourceProto.KafkaSourceConfig;
3840
import feast.core.SourceProto.SourceType;
41+
import feast.core.StoreProto;
3942
import feast.core.StoreProto.Store.RedisConfig;
4043
import feast.core.StoreProto.Store.StoreType;
44+
import feast.core.StoreProto.Store.Subscription;
4145
import feast.core.dao.FeatureSetRepository;
4246
import feast.core.dao.StoreRepository;
4347
import feast.core.exception.RetrievalException;
@@ -55,6 +59,7 @@
5559
import org.junit.Rule;
5660
import org.junit.Test;
5761
import org.junit.rules.ExpectedException;
62+
import org.mockito.ArgumentCaptor;
5863
import org.mockito.ArgumentMatchers;
5964
import org.mockito.Mock;
6065

@@ -328,6 +333,40 @@ public void applyFeatureSetShouldIncrementFeatureSetVersionIfAlreadyExists()
328333
assertThat(applyFeatureSetResponse.getFeatureSet(), equalTo(expected));
329334
}
330335

336+
@Test
337+
public void shouldUpdateStoreIfConfigChanges() throws InvalidProtocolBufferException {
338+
when(storeRepository.findById("SERVING")).thenReturn(Optional.of(stores.get(0)));
339+
StoreProto.Store newStore = StoreProto.Store.newBuilder()
340+
.setName("SERVING")
341+
.setType(StoreType.REDIS)
342+
.setRedisConfig(RedisConfig.newBuilder())
343+
.addSubscriptions(Subscription.newBuilder().setName("a").setVersion(">1"))
344+
.build();
345+
UpdateStoreResponse actual = specService
346+
.updateStore(UpdateStoreRequest.newBuilder().setStore(newStore).build());
347+
UpdateStoreResponse expected = UpdateStoreResponse.newBuilder()
348+
.setStore(newStore)
349+
.setStatus(UpdateStoreResponse.Status.UPDATED)
350+
.build();
351+
ArgumentCaptor<Store> argumentCaptor = ArgumentCaptor.forClass(Store.class);
352+
verify(storeRepository, times(1)).save(argumentCaptor.capture());
353+
assertThat(argumentCaptor.getValue().toProto(), equalTo(newStore));
354+
assertThat(actual, equalTo(expected));
355+
}
356+
357+
@Test
358+
public void shouldDoNothingIfNoChange() throws InvalidProtocolBufferException {
359+
when(storeRepository.findById("SERVING")).thenReturn(Optional.of(stores.get(0)));
360+
UpdateStoreResponse actual = specService
361+
.updateStore(UpdateStoreRequest.newBuilder().setStore(stores.get(0).toProto()).build());
362+
UpdateStoreResponse expected = UpdateStoreResponse.newBuilder()
363+
.setStore(stores.get(0).toProto())
364+
.setStatus(UpdateStoreResponse.Status.NO_CHANGE)
365+
.build();
366+
verify(storeRepository, times(0)).save(ArgumentMatchers.any());
367+
assertThat(actual, equalTo(expected));
368+
}
369+
331370
private FeatureSet newDummyFeatureSet(String name, int version) {
332371
KafkaSourceConfig kafkaFeatureSourceOptions =
333372
KafkaSourceConfig.newBuilder()

protos/feast/core/CoreService.proto

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,10 @@ service CoreService {
3737
// Returns all stores matching that filter.
3838
rpc GetStores (GetStoresRequest) returns (GetStoresResponse);
3939

40-
// Idempotent creation of feature set. Will not create a new feature set if schema does not change
40+
// Idempotent creation of feature set. Will not create a new feature set if schema does not change.
4141
rpc ApplyFeatureSet (ApplyFeatureSetRequest) returns (ApplyFeatureSetResponse);
4242

43-
// UpdateStore updates core with the configuration of the store. If the changes are valid,
43+
// Updates core with the configuration of the store. If the changes are valid,
4444
// core will return the given store configuration in response.
4545
rpc UpdateStore(UpdateStoreRequest) returns (UpdateStoreResponse);
4646
}
@@ -104,5 +104,13 @@ message UpdateStoreRequest {
104104
}
105105

106106
message UpdateStoreResponse {
107+
enum Status {
108+
// Existing store config matching the given store id is identical to the given store config.
109+
NO_CHANGE = 0;
110+
111+
// New store created or existing config updated.
112+
UPDATED = 1;
113+
}
107114
feast.core.Store store = 1;
115+
Status status = 2;
108116
}

0 commit comments

Comments
 (0)