3434import feast .serving .registry .RegistryRepository ;
3535import feast .serving .util .Metrics ;
3636import feast .storage .api .retriever .OnlineRetrieverV2 ;
37- import io .grpc .Status ;
3837import io .opentracing .Span ;
3938import io .opentracing .Tracer ;
4039import java .util .*;
@@ -51,6 +50,11 @@ public class OnlineServingServiceV2 implements ServingServiceV2 {
5150 private final OnlineTransformationService onlineTransformationService ;
5251 private final String project ;
5352
53+ public static final String DUMMY_ENTITY_ID = "__dummy_id" ;
54+ public static final String DUMMY_ENTITY_VAL = "" ;
55+ public static final ValueProto .Value DUMMY_ENTITY_VALUE =
56+ ValueProto .Value .newBuilder ().setStringVal (DUMMY_ENTITY_VAL ).build ();
57+
5458 public OnlineServingServiceV2 (
5559 OnlineRetrieverV2 retriever ,
5660 Tracer tracer ,
@@ -103,31 +107,18 @@ public ServingAPIProto.GetOnlineFeaturesResponse getOnlineFeatures(
103107
104108 List <Map <String , ValueProto .Value >> entityRows = getEntityRows (request );
105109
106- List <String > entityNames ;
107- if (retrievedFeatureReferences .size () > 0 ) {
108- entityNames = this .registryRepository .getEntitiesList (retrievedFeatureReferences .get (0 ));
109- } else {
110- throw new RuntimeException ("Requested features list must not be empty" );
111- }
112-
113110 Span storageRetrievalSpan = tracer .buildSpan ("storageRetrieval" ).start ();
114111 if (storageRetrievalSpan != null ) {
115112 storageRetrievalSpan .setTag ("entities" , entityRows .size ());
116113 storageRetrievalSpan .setTag ("features" , retrievedFeatureReferences .size ());
117114 }
115+
118116 List <List <feast .storage .api .retriever .Feature >> features =
119- retriever . getOnlineFeatures ( entityRows , retrievedFeatureReferences , entityNames );
117+ retrieveFeatures ( retrievedFeatureReferences , entityRows );
120118
121119 if (storageRetrievalSpan != null ) {
122120 storageRetrievalSpan .finish ();
123121 }
124- if (features .size () != entityRows .size ()) {
125- throw Status .INTERNAL
126- .withDescription (
127- "The no. of FeatureRow obtained from OnlineRetriever"
128- + "does not match no. of entityRow passed." )
129- .asRuntimeException ();
130- }
131122
132123 Span postProcessingSpan = tracer .buildSpan ("postProcessing" ).start ();
133124
@@ -255,6 +246,84 @@ private List<Map<String, ValueProto.Value>> getEntityRows(
255246 return entityRows ;
256247 }
257248
249+ private List <List <feast .storage .api .retriever .Feature >> retrieveFeatures (
250+ List <FeatureReferenceV2 > featureReferences , List <Map <String , ValueProto .Value >> entityRows ) {
251+ // Prepare feature reference to index mapping. This mapping will be used to arrange the
252+ // retrieved features to the same order as in the input.
253+ if (featureReferences .isEmpty ()) {
254+ throw new RuntimeException ("Requested features list must not be empty." );
255+ }
256+ Map <FeatureReferenceV2 , Integer > featureReferenceToIndexMap =
257+ new HashMap <>(featureReferences .size ());
258+ for (int i = 0 ; i < featureReferences .size (); i ++) {
259+ FeatureReferenceV2 featureReference = featureReferences .get (i );
260+ if (featureReferenceToIndexMap .containsKey (featureReference )) {
261+ throw new RuntimeException (
262+ String .format (
263+ "Found duplicate features %s:%s." ,
264+ featureReference .getFeatureViewName (), featureReference .getFeatureName ()));
265+ }
266+ featureReferenceToIndexMap .put (featureReference , i );
267+ }
268+
269+ // Create placeholders for retrieved features.
270+ List <List <feast .storage .api .retriever .Feature >> features = new ArrayList <>(entityRows .size ());
271+ for (int i = 0 ; i < entityRows .size (); i ++) {
272+ List <feast .storage .api .retriever .Feature > featuresPerEntity =
273+ new ArrayList <>(featureReferences .size ());
274+ for (int j = 0 ; j < featureReferences .size (); j ++) {
275+ featuresPerEntity .add (null );
276+ }
277+ features .add (featuresPerEntity );
278+ }
279+
280+ // Group feature references by join keys.
281+ Map <String , List <FeatureReferenceV2 >> groupNameToFeatureReferencesMap =
282+ featureReferences .stream ()
283+ .collect (
284+ Collectors .groupingBy (
285+ featureReference ->
286+ this .registryRepository .getEntitiesList (featureReference ).stream ()
287+ .map (this .registryRepository ::getEntityJoinKey )
288+ .sorted ()
289+ .collect (Collectors .joining ("," ))));
290+
291+ // Retrieve features one group at a time.
292+ for (List <FeatureReferenceV2 > featureReferencesPerGroup :
293+ groupNameToFeatureReferencesMap .values ()) {
294+ List <String > entityNames =
295+ this .registryRepository .getEntitiesList (featureReferencesPerGroup .get (0 ));
296+ List <Map <String , ValueProto .Value >> entityRowsPerGroup = new ArrayList <>(entityRows .size ());
297+ for (Map <String , ValueProto .Value > entityRow : entityRows ) {
298+ Map <String , ValueProto .Value > entityRowPerGroup = new HashMap <>();
299+ entityNames .stream ()
300+ .map (this .registryRepository ::getEntityJoinKey )
301+ .forEach (
302+ joinKey -> {
303+ if (joinKey .equals (DUMMY_ENTITY_ID )) {
304+ entityRowPerGroup .put (joinKey , DUMMY_ENTITY_VALUE );
305+ } else {
306+ ValueProto .Value value = entityRow .get (joinKey );
307+ if (value != null ) {
308+ entityRowPerGroup .put (joinKey , value );
309+ }
310+ }
311+ });
312+ entityRowsPerGroup .add (entityRowPerGroup );
313+ }
314+ List <List <feast .storage .api .retriever .Feature >> featuresPerGroup =
315+ retriever .getOnlineFeatures (entityRowsPerGroup , featureReferencesPerGroup , entityNames );
316+ for (int i = 0 ; i < featuresPerGroup .size (); i ++) {
317+ for (int j = 0 ; j < featureReferencesPerGroup .size (); j ++) {
318+ int k = featureReferenceToIndexMap .get (featureReferencesPerGroup .get (j ));
319+ features .get (i ).set (k , featuresPerGroup .get (i ).get (j ));
320+ }
321+ }
322+ }
323+
324+ return features ;
325+ }
326+
258327 private void populateOnDemandFeatures (
259328 List <FeatureReferenceV2 > onDemandFeatureReferences ,
260329 List <FeatureReferenceV2 > onDemandFeatureSources ,
0 commit comments