@@ -6,8 +6,10 @@ import (
66 "fmt"
77 "github.com/feast-dev/feast/go/protos/feast/serving"
88 "github.com/feast-dev/feast/go/protos/feast/types"
9- durationpb "google.golang.org/protobuf/types/known/durationpb"
10- timestamppb "google.golang.org/protobuf/types/known/timestamppb"
9+ "google.golang.org/grpc/codes"
10+ "google.golang.org/grpc/status"
11+ "google.golang.org/protobuf/types/known/durationpb"
12+ "google.golang.org/protobuf/types/known/timestamppb"
1113 "sort"
1214 "strings"
1315)
@@ -23,7 +25,10 @@ type entityKeyRow struct {
2325 rowIndices []int
2426}
2527
26- type ParsedKind struct {
28+ // A Features struct specifies a list of features to be retrieved from the online store. These features
29+ // can be specified either as a list of string feature references or as a feature service. String
30+ // feature references must have format "feature_view:feature", e.g. "customer_fv:daily_transactions".
31+ type Features struct {
2732 features []string
2833 featureService * FeatureService
2934}
@@ -47,7 +52,7 @@ type GroupedFeaturesPerEntitySet struct {
4752// NewFeatureStore constructs a feature store fat client using the
4853// repo config (contents of feature_store.yaml converted to JSON map).
4954func NewFeatureStore (config * RepoConfig ) (* FeatureStore , error ) {
50- onlineStore , err := getOnlineStore (config )
55+ onlineStore , err := NewOnlineStore (config )
5156 if err != nil {
5257 return nil , err
5358 }
@@ -67,14 +72,13 @@ func NewFeatureStore(config *RepoConfig) (*FeatureStore, error) {
6772
6873// TODO: Review all functions that use ODFV and Request FV since these have not been tested
6974func (fs * FeatureStore ) GetOnlineFeatures (ctx context.Context , request * serving.GetOnlineFeaturesRequest ) (* serving.GetOnlineFeaturesResponse , error ) {
70- kind := request .GetKind ()
7175 fullFeatureNames := request .GetFullFeatureNames ()
72- parsedKind , err := fs .parseKind ( kind )
76+ features , err := fs .parseFeatures ( request . GetKind () )
7377 if err != nil {
7478 return nil , err
7579 }
7680
77- featureRefs , err := fs .getFeatures ( parsedKind , true )
81+ featureRefs , err := fs .getFeatureRefs ( features )
7882 if err != nil {
7983 return nil , err
8084 }
@@ -88,13 +92,11 @@ func (fs *FeatureStore) GetOnlineFeatures(ctx context.Context, request *serving.
8892 return nil , err
8993 }
9094
91- fvs , requestedFeatureViews , requestedRequestFeatureViews , requestedOnDemandFeatureViews , err := fs .getFeatureViewsToUse (parsedKind , true , false )
95+ fvs , requestedFeatureViews , requestedRequestFeatureViews , requestedOnDemandFeatureViews , err := fs .getFeatureViewsToUse (features , true , false )
9296
93- // TODO (Ly): Remove this BLOCK once odfv is supported
9497 if len (requestedRequestFeatureViews )+ len (requestedOnDemandFeatureViews ) > 0 {
95- return nil , errors . New ( "ODFV is not supported in this iteration, please wait! " )
98+ return nil , status . Errorf ( codes . InvalidArgument , "on demand feature views are currently not supported " )
9699 }
97- // END BLOCK
98100
99101 if err != nil {
100102 return nil , err
@@ -124,7 +126,7 @@ func (fs *FeatureStore) GetOnlineFeatures(ctx context.Context, request *serving.
124126 // requestDataFeatures[entityName] = vals
125127 } else {
126128 if joinKey , ok := entityNameToJoinKeyMap [entityName ]; ! ok {
127- return nil , errors . New ( fmt .Sprintf ("entityNotFoundException: %s\n %v" , entityName , entityNameToJoinKeyMap ) )
129+ return nil , fmt .Errorf ("entityNotFoundException: %s\n %v" , entityName , entityNameToJoinKeyMap )
128130 } else {
129131 responseEntities [joinKey ] = vals
130132 }
@@ -207,42 +209,36 @@ func (fs *FeatureStore) DestructOnlineStore() {
207209 fs .onlineStore .Destruct ()
208210}
209211
210- func (fs * FeatureStore ) parseKind (kind interface {}) (* ParsedKind , error ) {
212+ // parseFeatures parses the kind field of a GetOnlineFeaturesRequest protobuf message
213+ // and populates a Features struct with the result.
214+ func (fs * FeatureStore ) parseFeatures (kind interface {}) (* Features , error ) {
211215 if featureList , ok := kind .(* serving.GetOnlineFeaturesRequest_Features ); ok {
212- return & ParsedKind {features : featureList .Features .GetVal (), featureService : nil }, nil
216+ return & Features {features : featureList .Features .GetVal (), featureService : nil }, nil
213217 }
214218 if featureServiceRequest , ok := kind .(* serving.GetOnlineFeaturesRequest_FeatureService ); ok {
215219 featureService , err := fs .registry .getFeatureService (fs .config .Project , featureServiceRequest .FeatureService )
216220 if err != nil {
217221 return nil , err
218222 }
219- return & ParsedKind {features : nil , featureService : featureService }, nil
223+ return & Features {features : nil , featureService : featureService }, nil
220224 }
221225 return nil , errors .New ("cannot parse kind from GetOnlineFeaturesRequest" )
222226}
223227
224- /*
225- This function returns all feature references from GetOnlineFeaturesRequest.
226- If a list of feature references is passed, return it.
227- Otherwise, FeatureService was passed, parse this feature service to get a list of FeatureViewProjection and return feature
228- references from this list
229- */
230-
231- func (fs * FeatureStore ) getFeatures (parsedKind * ParsedKind , allowCache bool ) ([]string , error ) {
232-
233- if parsedKind .featureService != nil {
234-
228+ // getFeatureRefs extracts a list of feature references from a Features struct.
229+ func (fs * FeatureStore ) getFeatureRefs (features * Features ) ([]string , error ) {
230+ if features .featureService != nil {
235231 var featureViewName string
236- features := make ([]string , 0 )
237- for _ , featureProjection := range parsedKind .featureService .projections {
232+ featureRefs := make ([]string , 0 )
233+ for _ , featureProjection := range features .featureService .projections {
238234 featureViewName = featureProjection .nameToUse ()
239235 for _ , feature := range featureProjection .features {
240- features = append (features , fmt .Sprintf ("%s:%s" , featureViewName , feature .name ))
236+ featureRefs = append (featureRefs , fmt .Sprintf ("%s:%s" , featureViewName , feature .name ))
241237 }
242238 }
243- return features , nil
239+ return featureRefs , nil
244240 } else {
245- return parsedKind .features , nil
241+ return features .features , nil
246242 }
247243}
248244
@@ -257,8 +253,7 @@ func (fs *FeatureStore) getFeatures(parsedKind *ParsedKind, allowCache bool) ([]
257253 retrieving all feature views. Similar argument to FeatureService applies.
258254
259255*/
260- func (fs * FeatureStore ) getFeatureViewsToUse (parsedKind * ParsedKind , allowCache , hideDummyEntity bool ) (map [string ]* FeatureView , map [* FeatureView ][]string , []* RequestFeatureView , []* OnDemandFeatureView , error ) {
261-
256+ func (fs * FeatureStore ) getFeatureViewsToUse (features * Features , allowCache , hideDummyEntity bool ) (map [string ]* FeatureView , map [* FeatureView ][]string , []* RequestFeatureView , []* OnDemandFeatureView , error ) {
262257 fvs := make (map [string ]* FeatureView )
263258 requestFvs := make (map [string ]* RequestFeatureView )
264259 odFvs := make (map [string ]* OnDemandFeatureView )
@@ -278,8 +273,8 @@ func (fs *FeatureStore) getFeatureViewsToUse(parsedKind *ParsedKind, allowCache,
278273 odFvs [onDemandFeatureView .base .name ] = onDemandFeatureView
279274 }
280275
281- if parsedKind .featureService != nil {
282- featureService := parsedKind .featureService
276+ if features .featureService != nil {
277+ featureService := features .featureService
283278
284279 fvsToUse := make (map [* FeatureView ][]string )
285280 requestFvsToUse := make ([]* RequestFeatureView , 0 )
@@ -312,9 +307,9 @@ func (fs *FeatureStore) getFeatureViewsToUse(parsedKind *ParsedKind, allowCache,
312307 }
313308 odFvsToUse = append (odFvsToUse , odFv .NewOnDemandFeatureViewFromBase (base ))
314309 } else {
315- return nil , nil , nil , nil , errors . New ( fmt .Sprintf ("the provided feature service %s contains a reference to a feature view" +
316- "%s which doesn't exist. Please make sure that you have created the feature view" +
317- "%s and that you have registered it by running \" apply\" . " , featureService .name , featureViewName , featureViewName ) )
310+ return nil , nil , nil , nil , fmt .Errorf ("the provided feature service %s contains a reference to a feature view" +
311+ "%s which doesn't exist, please make sure that you have created the feature view" +
312+ "%s and that you have registered it by running \" apply\" " , featureService .name , featureViewName , featureViewName )
318313 }
319314 }
320315 return fvs , fvsToUse , requestFvsToUse , odFvsToUse , nil
@@ -324,7 +319,7 @@ func (fs *FeatureStore) getFeatureViewsToUse(parsedKind *ParsedKind, allowCache,
324319 requestFvsToUse := make ([]* RequestFeatureView , 0 )
325320 odFvsToUse := make ([]* OnDemandFeatureView , 0 )
326321
327- for _ , featureRef := range parsedKind .features {
322+ for _ , featureRef := range features .features {
328323 featureViewName , featureName , err := parseFeatureReference (featureRef )
329324 if err != nil {
330325 return nil , nil , nil , nil , err
@@ -336,8 +331,8 @@ func (fs *FeatureStore) getFeatureViewsToUse(parsedKind *ParsedKind, allowCache,
336331 } else if odFv , ok := odFvs [featureViewName ]; ok {
337332 odFvsToUse = append (odFvsToUse , odFv )
338333 } else {
339- return nil , nil , nil , nil , errors . New ( fmt .Sprintf ("feature view %s doesn't exist. Please make sure that you have created the" +
340- " feature view %s and that you have registered it by running \" apply\" . " , featureViewName , featureViewName ) )
334+ return nil , nil , nil , nil , fmt .Errorf ("feature view %s doesn't exist, please make sure that you have created the" +
335+ " feature view %s and that you have registered it by running \" apply\" " , featureViewName , featureViewName )
341336 }
342337 }
343338
@@ -395,7 +390,7 @@ func (fs *FeatureStore) validateEntityValues(joinKeyValues map[string]*types.Rep
395390 numRows = len (col .Val )
396391 }
397392 if len (setOfRowLengths ) > 1 {
398- return 0 , errors .New ("valueError: All entity rows must have the same columns. " )
393+ return 0 , errors .New ("valueError: All entity rows must have the same columns" )
399394 }
400395 return numRows , nil
401396}
@@ -490,7 +485,7 @@ func (fs *FeatureStore) ensureRequestedDataExist(neededRequestData map[string]st
490485 missingFeatures = append (missingFeatures , feature )
491486 }
492487 }
493- return errors . New ( fmt .Sprintf ("requestDataNotFoundInEntityRowsException: %s" , strings .Join (missingFeatures , ", " ) ))
488+ return fmt .Errorf ("requestDataNotFoundInEntityRowsException: %s" , strings .Join (missingFeatures , ", " ))
494489 }
495490 return nil
496491}
0 commit comments