1616 */
1717package feast .core .grpc ;
1818
19- import com .google .common .collect .Lists ;
2019import com .google .protobuf .InvalidProtocolBufferException ;
2120import feast .core .CoreServiceGrpc .CoreServiceImplBase ;
2221import feast .core .CoreServiceProto .ApplyFeatureSetRequest ;
2827import feast .core .CoreServiceProto .ListFeatureSetsRequest ;
2928import feast .core .CoreServiceProto .ListFeatureSetsResponse ;
3029import feast .core .CoreServiceProto .ListStoresRequest ;
31- import feast .core .CoreServiceProto .ListStoresRequest .Filter ;
3230import feast .core .CoreServiceProto .ListStoresResponse ;
3331import feast .core .CoreServiceProto .UpdateStoreRequest ;
3432import feast .core .CoreServiceProto .UpdateStoreResponse ;
35- import feast .core .CoreServiceProto .UpdateStoreResponse .Status ;
36- import feast .core .FeatureSetProto .FeatureSetSpec ;
37- import feast .core .SourceProto ;
38- import feast .core .StoreProto .Store ;
39- import feast .core .StoreProto .Store .Subscription ;
4033import feast .core .exception .RetrievalException ;
4134import feast .core .grpc .interceptors .MonitoringInterceptor ;
42- import feast .core .service .JobCoordinatorService ;
4335import feast .core .service .SpecService ;
4436import io .grpc .StatusRuntimeException ;
4537import io .grpc .stub .StreamObserver ;
46- import java .util .HashSet ;
47- import java .util .Set ;
48- import java .util .stream .Collectors ;
4938import lombok .extern .slf4j .Slf4j ;
5039import org .lognet .springboot .grpc .GRpcService ;
5140import org .springframework .beans .factory .annotation .Autowired ;
52- import org .springframework .transaction .annotation .Transactional ;
5341
54- /**
55- * Implementation of the feast core GRPC service.
56- */
42+ /** Implementation of the feast core GRPC service. */
5743@ Slf4j
5844@ GRpcService (interceptors = {MonitoringInterceptor .class })
5945public class CoreServiceImpl extends CoreServiceImplBase {
6046
6147 private SpecService specService ;
62- private JobCoordinatorService jobCoordinatorService ;
6348
6449 @ Autowired
65- public CoreServiceImpl (SpecService specService , JobCoordinatorService jobCoordinatorService ) {
50+ public CoreServiceImpl (SpecService specService ) {
6651 this .specService = specService ;
67- this .jobCoordinatorService = jobCoordinatorService ;
6852 }
6953
7054 @ Override
@@ -118,31 +102,6 @@ public void applyFeatureSet(
118102 ApplyFeatureSetRequest request , StreamObserver <ApplyFeatureSetResponse > responseObserver ) {
119103 try {
120104 ApplyFeatureSetResponse response = specService .applyFeatureSet (request .getFeatureSet ());
121- ListStoresResponse stores = specService .listStores (Filter .newBuilder ().build ());
122- for (Store store : stores .getStoreList ()) {
123- Set <FeatureSetSpec > featureSetSpecs = new HashSet <>();
124- for (Subscription subscription : store .getSubscriptionsList ()) {
125- featureSetSpecs .addAll (
126- specService
127- .listFeatureSets (
128- ListFeatureSetsRequest .Filter .newBuilder ()
129- .setFeatureSetName (subscription .getName ())
130- .setFeatureSetVersion (subscription .getVersion ())
131- .build ())
132- .getFeatureSetsList ());
133- }
134- if (!featureSetSpecs .isEmpty () && featureSetSpecs .contains (response .getFeatureSet ())) {
135- // We use the response featureSet source because it contains the information
136- // about whether to default to the default feature stream or not
137- SourceProto .Source source = response .getFeatureSet ().getSource ();
138- featureSetSpecs =
139- featureSetSpecs .stream ()
140- .filter (fs -> fs .getSource ().equals (source ))
141- .collect (Collectors .toSet ());
142- jobCoordinatorService .startOrUpdateJob (
143- Lists .newArrayList (featureSetSpecs ), source , store );
144- }
145- }
146105 responseObserver .onNext (response );
147106 responseObserver .onCompleted ();
148107 } catch (Exception e ) {
@@ -158,30 +117,6 @@ public void updateStore(
158117 UpdateStoreResponse response = specService .updateStore (request );
159118 responseObserver .onNext (response );
160119 responseObserver .onCompleted ();
161-
162- if (!response .getStatus ().equals (Status .NO_CHANGE )) {
163- Set <FeatureSetSpec > featureSetSpecs = new HashSet <>();
164- Store store = response .getStore ();
165- for (Subscription subscription : store .getSubscriptionsList ()) {
166- featureSetSpecs .addAll (
167- specService
168- .listFeatureSets (
169- ListFeatureSetsRequest .Filter .newBuilder ()
170- .setFeatureSetName (subscription .getName ())
171- .setFeatureSetVersion (subscription .getVersion ())
172- .build ())
173- .getFeatureSetsList ());
174- }
175- if (featureSetSpecs .size () == 0 ) {
176- return ;
177- }
178- featureSetSpecs .stream ()
179- .collect (Collectors .groupingBy (FeatureSetSpec ::getSource ))
180- .entrySet ()
181- .stream ()
182- .forEach (
183- kv -> jobCoordinatorService .startOrUpdateJob (kv .getValue (), kv .getKey (), store ));
184- }
185120 } catch (Exception e ) {
186121 log .error ("Exception has occurred in UpdateStore method: " , e );
187122 responseObserver .onError (e );
0 commit comments