2929import feast .core .CoreServiceProto .GetStoresRequest ;
3030import feast .core .CoreServiceProto .GetStoresRequest .Filter ;
3131import feast .core .CoreServiceProto .GetStoresResponse ;
32+ import feast .core .CoreServiceProto .UpdateStoreRequest ;
33+ import feast .core .CoreServiceProto .UpdateStoreResponse ;
34+ import feast .core .CoreServiceProto .UpdateStoreResponse .Status ;
3235import feast .core .FeatureSetProto .FeatureSetSpec ;
3336import feast .core .StoreProto .Store ;
3437import feast .core .StoreProto .Store .Subscription ;
3538import feast .core .exception .RetrievalException ;
3639import feast .core .service .JobCoordinatorService ;
3740import feast .core .service .SpecService ;
38- import io .grpc .Status ;
39- import io .grpc .StatusRuntimeException ;
4041import io .grpc .stub .StreamObserver ;
4142import java .util .ArrayList ;
43+ import java .util .HashSet ;
4244import java .util .List ;
45+ import java .util .Set ;
4346import java .util .regex .Pattern ;
4447import java .util .stream .Collectors ;
4548import lombok .extern .slf4j .Slf4j ;
4649import org .lognet .springboot .grpc .GRpcService ;
4750import org .springframework .beans .factory .annotation .Autowired ;
4851import org .springframework .transaction .annotation .Transactional ;
4952
50- /** Implementation of the feast core GRPC service. */
53+ /**
54+ * Implementation of the feast core GRPC service.
55+ */
5156@ Slf4j
5257@ GRpcService
5358public class CoreServiceImpl extends CoreServiceImplBase {
5459
55- @ Autowired private SpecService specService ;
56- @ Autowired private StatsDClient statsDClient ;
57- @ Autowired private JobCoordinatorService jobCoordinatorService ;
60+ @ Autowired
61+ private SpecService specService ;
62+ @ Autowired
63+ private JobCoordinatorService jobCoordinatorService ;
5864
5965 @ Override
6066 public void getFeastCoreVersion (
@@ -72,7 +78,7 @@ public void getFeatureSets(
7278 responseObserver .onNext (response );
7379 responseObserver .onCompleted ();
7480 } catch (RetrievalException | InvalidProtocolBufferException e ) {
75- responseObserver .onError (getRuntimeException ( e ) );
81+ responseObserver .onError (e );
7682 }
7783 }
7884
@@ -85,7 +91,7 @@ public void getStores(
8591 responseObserver .onNext (response );
8692 responseObserver .onCompleted ();
8793 } catch (RetrievalException e ) {
88- responseObserver .onError (getRuntimeException ( e ) );
94+ responseObserver .onError (e );
8995 }
9096 }
9197
@@ -124,17 +130,39 @@ public void applyFeatureSet(
124130 responseObserver .onNext (response );
125131 responseObserver .onCompleted ();
126132 } catch (Exception e ) {
127- responseObserver .onError (getRuntimeException ( e ) );
133+ responseObserver .onError (e );
128134 }
129135 }
130136
131- private StatusRuntimeException getRuntimeException (Exception e ) {
132- return new StatusRuntimeException (
133- Status .fromCode (Status .Code .INTERNAL ).withDescription (e .getMessage ()).withCause (e ));
134- }
137+ @ Override
138+ public void updateStore (UpdateStoreRequest request ,
139+ StreamObserver <UpdateStoreResponse > responseObserver ) {
140+ try {
141+ UpdateStoreResponse response = specService .updateStore (request );
142+ responseObserver .onNext (response );
143+ responseObserver .onCompleted ();
135144
136- private StatusRuntimeException getBadRequestException (Exception e ) {
137- return new StatusRuntimeException (
138- Status .fromCode (Status .Code .OUT_OF_RANGE ).withDescription (e .getMessage ()).withCause (e ));
145+ if (!response .getStatus ().equals (Status .NO_CHANGE )) {
146+ Set <FeatureSetSpec > featureSetSpecs = new HashSet <>();
147+ Store store = response .getStore ();
148+ for (Subscription subscription : store .getSubscriptionsList ()) {
149+ featureSetSpecs .addAll (
150+ specService .getFeatureSets (
151+ GetFeatureSetsRequest .Filter .newBuilder ()
152+ .setFeatureSetName (subscription .getName ())
153+ .setFeatureSetVersion (subscription .getVersion ())
154+ .build ())
155+ .getFeatureSetsList ()
156+ );
157+ }
158+ featureSetSpecs .stream ()
159+ .collect (Collectors .groupingBy (FeatureSetSpec ::getName ))
160+ .entrySet ()
161+ .stream ()
162+ .forEach (kv -> jobCoordinatorService .startOrUpdateJob (kv .getValue (), store ));
163+ }
164+ } catch (Exception e ) {
165+ responseObserver .onError (e );
166+ }
139167 }
140168}
0 commit comments