diff --git a/examples/java-demo/feature_repo/driver_repo.py b/examples/java-demo/feature_repo/driver_repo.py index c91e5a40be..e17a5d9cf8 100644 --- a/examples/java-demo/feature_repo/driver_repo.py +++ b/examples/java-demo/feature_repo/driver_repo.py @@ -7,14 +7,14 @@ from google.protobuf.duration_pb2 import Duration from feast.field import Field -from feast import Entity, Feature, BatchFeatureView, FileSource, ValueType +from feast import Entity, Feature, BatchFeatureView, FileSource driver_hourly_stats = FileSource( path="data/driver_stats_with_string.parquet", timestamp_field="event_timestamp", created_timestamp_column="created", ) -driver = Entity(name="driver_id", value_type=ValueType.INT64, description="driver id",) +driver = Entity(name="driver_id", description="driver id",) driver_hourly_stats_view = BatchFeatureView( name="driver_hourly_stats", entities=["driver_id"], diff --git a/go/embedded/online_features.go b/go/embedded/online_features.go index 7cd1e4ed81..710f60dca8 100644 --- a/go/embedded/online_features.go +++ b/go/embedded/online_features.go @@ -80,12 +80,6 @@ func (s *OnlineFeatureService) GetEntityTypesMap(featureRefs []string) (map[stri viewNames[viewName] = nil } - entities, _ := s.fs.ListEntities(true) - entitiesByName := make(map[string]*model.Entity) - for _, entity := range entities { - entitiesByName[entity.Name] = entity - } - joinKeyTypes := make(map[string]int32) for viewName := range viewNames { @@ -94,9 +88,8 @@ func (s *OnlineFeatureService) GetEntityTypesMap(featureRefs []string) (map[stri // skip on demand feature views continue } - for _, entityName := range view.Entities { - entity := entitiesByName[entityName] - joinKeyTypes[entity.JoinKey] = int32(entity.ValueType.Number()) + for _, entityColumn := range view.EntityColumns { + joinKeyTypes[entityColumn.Name] = int32(entityColumn.Dtype.Number()) } } @@ -111,21 +104,14 @@ func (s *OnlineFeatureService) GetEntityTypesMapByFeatureService(featureServiceN joinKeyTypes := make(map[string]int32) - entities, _ := s.fs.ListEntities(true) - entitiesByName := make(map[string]*model.Entity) - for _, entity := range entities { - entitiesByName[entity.Name] = entity - } - for _, projection := range featureService.Projections { view, err := s.fs.GetFeatureView(projection.Name, true) if err != nil { // skip on demand feature views continue } - for _, entityName := range view.Entities { - entity := entitiesByName[entityName] - joinKeyTypes[entity.JoinKey] = int32(entity.ValueType.Number()) + for _, entityColumn := range view.EntityColumns { + joinKeyTypes[entityColumn.Name] = int32(entityColumn.Dtype.Number()) } } diff --git a/go/internal/feast/featurestore.go b/go/internal/feast/featurestore.go index 4ecd781b74..b0fc987fb4 100644 --- a/go/internal/feast/featurestore.go +++ b/go/internal/feast/featurestore.go @@ -132,7 +132,7 @@ func (fs *FeatureStore) GetOnlineFeatures( if entitylessCase { dummyEntityColumn := &prototypes.RepeatedValue{Val: make([]*prototypes.Value, numRows)} for index := 0; index < numRows; index++ { - dummyEntityColumn.Val[index] = &model.DUMMY_ENTITY + dummyEntityColumn.Val[index] = &model.DUMMY_ENTITY_VALUE } joinKeyToEntityValues[model.DUMMY_ENTITY_ID] = dummyEntityColumn } @@ -272,7 +272,7 @@ func (fs *FeatureStore) GetFeatureView(featureViewName string, hideDummyEntity b return nil, err } if fv.HasEntity(model.DUMMY_ENTITY_NAME) && hideDummyEntity { - fv.Entities = []string{} + fv.EntityNames = []string{} } return fv, nil } diff --git a/go/internal/feast/model/basefeatureview.go b/go/internal/feast/model/basefeatureview.go index 28ef7231fd..1bdf614c25 100644 --- a/go/internal/feast/model/basefeatureview.go +++ b/go/internal/feast/model/basefeatureview.go @@ -8,15 +8,15 @@ import ( type BaseFeatureView struct { Name string - Features []*Feature + Features []*Field Projection *FeatureViewProjection } func NewBaseFeatureView(name string, featureProtos []*core.FeatureSpecV2) *BaseFeatureView { base := &BaseFeatureView{Name: name} - features := make([]*Feature, len(featureProtos)) + features := make([]*Field, len(featureProtos)) for index, featureSpecV2 := range featureProtos { - features[index] = NewFeatureFromProto(featureSpecV2) + features[index] = NewFieldFromProto(featureSpecV2) } base.Features = features base.Projection = NewFeatureViewProjectionFromDefinition(base) @@ -43,7 +43,7 @@ func (fv *BaseFeatureView) WithProjection(projection *FeatureViewProjection) (*B } func (fv *BaseFeatureView) ProjectWithFeatures(featureNames []string) *FeatureViewProjection { - features := make([]*Feature, 0) + features := make([]*Field, 0) for _, feature := range fv.Features { for _, allowedFeatureName := range featureNames { if feature.Name == allowedFeatureName { diff --git a/go/internal/feast/model/entity.go b/go/internal/feast/model/entity.go index ac3a5d5f26..5a09edb655 100644 --- a/go/internal/feast/model/entity.go +++ b/go/internal/feast/model/entity.go @@ -2,18 +2,16 @@ package model import ( "github.com/feast-dev/feast/go/protos/feast/core" - "github.com/feast-dev/feast/go/protos/feast/types" ) type Entity struct { - Name string - ValueType types.ValueType_Enum - JoinKey string + Name string + JoinKey string } func NewEntityFromProto(proto *core.Entity) *Entity { - return &Entity{Name: proto.Spec.Name, - ValueType: proto.Spec.ValueType, - JoinKey: proto.Spec.JoinKey, + return &Entity{ + Name: proto.Spec.Name, + JoinKey: proto.Spec.JoinKey, } } diff --git a/go/internal/feast/model/featureview.go b/go/internal/feast/model/featureview.go index 6c198f9994..ceb3736f99 100644 --- a/go/internal/feast/model/featureview.go +++ b/go/internal/feast/model/featureview.go @@ -13,12 +13,13 @@ const ( DUMMY_ENTITY_VAL = "" ) -var DUMMY_ENTITY types.Value = types.Value{Val: &types.Value_StringVal{StringVal: DUMMY_ENTITY_VAL}} +var DUMMY_ENTITY_VALUE types.Value = types.Value{Val: &types.Value_StringVal{StringVal: DUMMY_ENTITY_VAL}} type FeatureView struct { - Base *BaseFeatureView - Ttl *durationpb.Duration - Entities []string + Base *BaseFeatureView + Ttl *durationpb.Duration + EntityNames []string + EntityColumns []*Field } func NewFeatureViewFromProto(proto *core.FeatureView) *FeatureView { @@ -26,25 +27,30 @@ func NewFeatureViewFromProto(proto *core.FeatureView) *FeatureView { Ttl: &(*proto.Spec.Ttl), } if len(proto.Spec.Entities) == 0 { - featureView.Entities = []string{DUMMY_ENTITY_NAME} + featureView.EntityNames = []string{DUMMY_ENTITY_NAME} } else { - featureView.Entities = proto.Spec.Entities + featureView.EntityNames = proto.Spec.Entities } + entityColumns := make([]*Field, len(proto.Spec.EntityColumns)) + for i, entityColumn := range proto.Spec.EntityColumns { + entityColumns[i] = NewFieldFromProto(entityColumn) + } + featureView.EntityColumns = entityColumns return featureView } -func (fs *FeatureView) NewFeatureViewFromBase(base *BaseFeatureView) *FeatureView { - ttl := durationpb.Duration{Seconds: fs.Ttl.Seconds, Nanos: fs.Ttl.Nanos} +func (fv *FeatureView) NewFeatureViewFromBase(base *BaseFeatureView) *FeatureView { + ttl := durationpb.Duration{Seconds: fv.Ttl.Seconds, Nanos: fv.Ttl.Nanos} featureView := &FeatureView{Base: base, - Ttl: &ttl, - Entities: fs.Entities, + Ttl: &ttl, + EntityNames: fv.EntityNames, } return featureView } -func (fs *FeatureView) HasEntity(lookup string) bool { - for _, entityName := range fs.Entities { - if entityName == lookup { +func (fv *FeatureView) HasEntity(name string) bool { + for _, entityName := range fv.EntityNames { + if entityName == name { return true } } diff --git a/go/internal/feast/model/featureviewprojection.go b/go/internal/feast/model/featureviewprojection.go index e80e8844ed..fe54774ff1 100644 --- a/go/internal/feast/model/featureviewprojection.go +++ b/go/internal/feast/model/featureviewprojection.go @@ -7,7 +7,7 @@ import ( type FeatureViewProjection struct { Name string NameAlias string - Features []*Feature + Features []*Field JoinKeyMap map[string]string } @@ -24,9 +24,9 @@ func NewFeatureViewProjectionFromProto(proto *core.FeatureViewProjection) *Featu JoinKeyMap: proto.JoinKeyMap, } - features := make([]*Feature, len(proto.FeatureColumns)) + features := make([]*Field, len(proto.FeatureColumns)) for index, featureSpecV2 := range proto.FeatureColumns { - features[index] = NewFeatureFromProto(featureSpecV2) + features[index] = NewFieldFromProto(featureSpecV2) } featureProjection.Features = features return featureProjection diff --git a/go/internal/feast/model/feature.go b/go/internal/feast/model/field.go similarity index 63% rename from go/internal/feast/model/feature.go rename to go/internal/feast/model/field.go index d833a8901b..4f72d34686 100644 --- a/go/internal/feast/model/feature.go +++ b/go/internal/feast/model/field.go @@ -5,13 +5,14 @@ import ( "github.com/feast-dev/feast/go/protos/feast/types" ) -type Feature struct { +type Field struct { Name string Dtype types.ValueType_Enum } -func NewFeatureFromProto(proto *core.FeatureSpecV2) *Feature { - return &Feature{Name: proto.Name, +func NewFieldFromProto(proto *core.FeatureSpecV2) *Field { + return &Field{ + Name: proto.Name, Dtype: proto.ValueType, } } diff --git a/go/internal/feast/onlineserving/serving.go b/go/internal/feast/onlineserving/serving.go index 1d0567c354..e2a2df923b 100644 --- a/go/internal/feast/onlineserving/serving.go +++ b/go/internal/feast/onlineserving/serving.go @@ -251,7 +251,7 @@ func GetEntityMaps(requestedFeatureViews []*FeatureViewAndRefs, entities []*mode joinKeyToAliasMap = map[string]string{} } - for _, entityName := range featureView.Entities { + for _, entityName := range featureView.EntityNames { joinKey := entitiesByName[entityName].JoinKey entityNameToJoinKeyMap[entityName] = joinKey @@ -518,7 +518,7 @@ func GroupFeatureRefs(requestedFeatureViews []*FeatureViewAndRefs, joinKeys := make([]string, 0) fv := featuresAndView.View featureNames := featuresAndView.FeatureRefs - for _, entityName := range fv.Entities { + for _, entityName := range fv.EntityNames { joinKeys = append(joinKeys, entityNameToJoinKeyMap[entityName]) } diff --git a/go/internal/feast/onlineserving/serving_test.go b/go/internal/feast/onlineserving/serving_test.go index 0a00f546f9..bd4e45a21e 100644 --- a/go/internal/feast/onlineserving/serving_test.go +++ b/go/internal/feast/onlineserving/serving_test.go @@ -20,19 +20,19 @@ func TestGroupingFeatureRefs(t *testing.T) { NameAlias: "aliasViewA", }, }, - Entities: []string{"driver", "customer"}, + EntityNames: []string{"driver", "customer"}, } viewB := &model.FeatureView{ - Base: &model.BaseFeatureView{Name: "viewB"}, - Entities: []string{"driver", "customer"}, + Base: &model.BaseFeatureView{Name: "viewB"}, + EntityNames: []string{"driver", "customer"}, } viewC := &model.FeatureView{ - Base: &model.BaseFeatureView{Name: "viewC"}, - Entities: []string{"driver"}, + Base: &model.BaseFeatureView{Name: "viewC"}, + EntityNames: []string{"driver"}, } viewD := &model.FeatureView{ - Base: &model.BaseFeatureView{Name: "viewD"}, - Entities: []string{"customer"}, + Base: &model.BaseFeatureView{Name: "viewD"}, + EntityNames: []string{"customer"}, } refGroups, _ := GroupFeatureRefs( []*FeatureViewAndRefs{ @@ -105,11 +105,11 @@ func TestGroupingFeatureRefsWithJoinKeyAliases(t *testing.T) { JoinKeyMap: map[string]string{"location_id": "destination_id"}, }, }, - Entities: []string{"location"}, + EntityNames: []string{"location"}, } viewB := &model.FeatureView{ - Base: &model.BaseFeatureView{Name: "viewB"}, - Entities: []string{"location"}, + Base: &model.BaseFeatureView{Name: "viewB"}, + EntityNames: []string{"location"}, } refGroups, _ := GroupFeatureRefs( @@ -164,7 +164,7 @@ func TestGroupingFeatureRefsWithMissingKey(t *testing.T) { JoinKeyMap: map[string]string{"location_id": "destination_id"}, }, }, - Entities: []string{"location"}, + EntityNames: []string{"location"}, } _, err := GroupFeatureRefs( diff --git a/go/internal/feast/server/logging/featureserviceschema.go b/go/internal/feast/server/logging/featureserviceschema.go index 5047346c2c..2779982fc0 100644 --- a/go/internal/feast/server/logging/featureserviceschema.go +++ b/go/internal/feast/server/logging/featureserviceschema.go @@ -52,13 +52,12 @@ func generateSchema(featureService *model.FeatureService, entityMap map[string]* features = append(features, fullFeatureName) allFeatureTypes[fullFeatureName] = f.Dtype } - for _, entityName := range fv.Entities { - entity := entityMap[entityName] + for _, entityColumn := range fv.EntityColumns { var joinKey string - if joinKeyAlias, ok := featureProjection.JoinKeyMap[entity.JoinKey]; ok { + if joinKeyAlias, ok := featureProjection.JoinKeyMap[entityColumn.Name]; ok { joinKey = joinKeyAlias } else { - joinKey = entity.JoinKey + joinKey = entityColumn.Name } if _, ok := joinKeysSet[joinKey]; !ok { @@ -66,7 +65,7 @@ func generateSchema(featureService *model.FeatureService, entityMap map[string]* } joinKeysSet[joinKey] = nil - entityJoinKeyToType[joinKey] = entity.ValueType + entityJoinKeyToType[joinKey] = entityColumn.Dtype } } else if odFv, ok := odFvMap[featureViewName]; ok { for _, f := range featureProjection.Features { diff --git a/go/internal/feast/server/logging/featureserviceschema_test.go b/go/internal/feast/server/logging/featureserviceschema_test.go index efcd5ec7fc..6fa1c12e24 100644 --- a/go/internal/feast/server/logging/featureserviceschema_test.go +++ b/go/internal/feast/server/logging/featureserviceschema_test.go @@ -74,9 +74,10 @@ func TestSchemaRetrievalIgnoresEntitiesNotInFeatureService(t *testing.T) { featureService, entities, fvs, odfvs := InitializeFeatureRepoVariablesForTest() entityMap, fvMap, odFvMap := buildFCOMaps(entities, fvs, odfvs) - //Remove entities in featureservice + // Remove entities in featureservice for _, featureView := range fvs { - featureView.Entities = []string{} + featureView.EntityNames = []string{} + featureView.EntityColumns = []*model.Field{} } schema, err := generateSchema(featureService, entityMap, fvMap, odFvMap) @@ -126,65 +127,69 @@ func TestSchemaUsesOrderInFeatureService(t *testing.T) { // Initialize all dummy featureservice, entities and featureviews/on demand featureviews for testing. func InitializeFeatureRepoVariablesForTest() (*model.FeatureService, []*model.Entity, []*model.FeatureView, []*model.OnDemandFeatureView) { - f1 := test.CreateNewFeature( + f1 := test.CreateNewField( "int64", types.ValueType_INT64, ) - f2 := test.CreateNewFeature( + f2 := test.CreateNewField( "float32", types.ValueType_FLOAT, ) projection1 := test.CreateNewFeatureViewProjection( "featureView1", "", - []*model.Feature{f1, f2}, + []*model.Field{f1, f2}, map[string]string{}, ) baseFeatureView1 := test.CreateBaseFeatureView( "featureView1", - []*model.Feature{f1, f2}, + []*model.Field{f1, f2}, projection1, ) - featureView1 := test.CreateFeatureView(baseFeatureView1, nil, []string{"driver_id"}) - entity1 := test.CreateNewEntity("driver_id", types.ValueType_INT64, "driver_id") - f3 := test.CreateNewFeature( + entity1 := test.CreateNewEntity("driver_id", "driver_id") + entitycolumn1 := test.CreateNewField( + "driver_id", + types.ValueType_INT64, + ) + featureView1 := test.CreateFeatureView(baseFeatureView1, nil, []string{"driver_id"}, []*model.Field{entitycolumn1}) + f3 := test.CreateNewField( "int32", types.ValueType_INT32, ) - f4 := test.CreateNewFeature( + f4 := test.CreateNewField( "double", types.ValueType_DOUBLE, ) projection2 := test.CreateNewFeatureViewProjection( "featureView2", "", - []*model.Feature{f3, f4}, + []*model.Field{f3, f4}, map[string]string{}, ) baseFeatureView2 := test.CreateBaseFeatureView( "featureView2", - []*model.Feature{f3, f4}, + []*model.Field{f3, f4}, projection2, ) - featureView2 := test.CreateFeatureView(baseFeatureView2, nil, []string{"driver_id"}) + featureView2 := test.CreateFeatureView(baseFeatureView2, nil, []string{"driver_id"}, []*model.Field{entitycolumn1}) - f5 := test.CreateNewFeature( + f5 := test.CreateNewField( "odfv_f1", types.ValueType_INT32, ) - f6 := test.CreateNewFeature( + f6 := test.CreateNewField( "odfv_f2", types.ValueType_DOUBLE, ) projection3 := test.CreateNewFeatureViewProjection( "od_bf1", "", - []*model.Feature{f5, f6}, + []*model.Field{f5, f6}, map[string]string{}, ) od_bf1 := test.CreateBaseFeatureView( "od_bf1", - []*model.Feature{f5, f6}, + []*model.Field{f5, f6}, projection3, ) odfv := model.NewOnDemandFeatureViewFromBase(od_bf1) diff --git a/go/internal/test/go_integration_test_utils.go b/go/internal/test/go_integration_test_utils.go index eb727ba1db..275edc7b98 100644 --- a/go/internal/test/go_integration_test_utils.go +++ b/go/internal/test/go_integration_test_utils.go @@ -193,7 +193,7 @@ func GetProtoFromRecord(rec arrow.Record) (map[string]*types.RepeatedValue, erro return r, nil } -func CreateBaseFeatureView(name string, features []*model.Feature, projection *model.FeatureViewProjection) *model.BaseFeatureView { +func CreateBaseFeatureView(name string, features []*model.Field, projection *model.FeatureViewProjection) *model.BaseFeatureView { return &model.BaseFeatureView{ Name: name, Features: features, @@ -201,16 +201,15 @@ func CreateBaseFeatureView(name string, features []*model.Feature, projection *m } } -func CreateNewEntity(name string, valueType types.ValueType_Enum, joinKey string) *model.Entity { +func CreateNewEntity(name string, joinKey string) *model.Entity { return &model.Entity{ - Name: name, - ValueType: valueType, - JoinKey: joinKey, + Name: name, + JoinKey: joinKey, } } -func CreateNewFeature(name string, dtype types.ValueType_Enum) *model.Feature { - return &model.Feature{Name: name, +func CreateNewField(name string, dtype types.ValueType_Enum) *model.Field { + return &model.Field{Name: name, Dtype: dtype, } } @@ -225,7 +224,7 @@ func CreateNewFeatureService(name string, project string, createdTimestamp *time } } -func CreateNewFeatureViewProjection(name string, nameAlias string, features []*model.Feature, joinKeyMap map[string]string) *model.FeatureViewProjection { +func CreateNewFeatureViewProjection(name string, nameAlias string, features []*model.Field, joinKeyMap map[string]string) *model.FeatureViewProjection { return &model.FeatureViewProjection{Name: name, NameAlias: nameAlias, Features: features, @@ -233,10 +232,11 @@ func CreateNewFeatureViewProjection(name string, nameAlias string, features []*m } } -func CreateFeatureView(base *model.BaseFeatureView, ttl *durationpb.Duration, entities []string) *model.FeatureView { +func CreateFeatureView(base *model.BaseFeatureView, ttl *durationpb.Duration, entities []string, entityColumns []*model.Field) *model.FeatureView { return &model.FeatureView{ - Base: base, - Ttl: ttl, - Entities: entities, + Base: base, + Ttl: ttl, + EntityNames: entities, + EntityColumns: entityColumns, } } diff --git a/java/serving/src/test/resources/docker-compose/feast10/definitions.py b/java/serving/src/test/resources/docker-compose/feast10/definitions.py index e514ef0d23..806995ec06 100644 --- a/java/serving/src/test/resources/docker-compose/feast10/definitions.py +++ b/java/serving/src/test/resources/docker-compose/feast10/definitions.py @@ -19,22 +19,23 @@ # Define an entity for the driver. You can think of entity as a primary key used to # fetch features. -driver = Entity(name="driver_id", value_type=ValueType.INT64, description="driver id",) +driver = Entity(name="driver_id", description="driver id") # Our parquet files contain sample data that includes a driver_id column, timestamps and # three feature column. Here we define a Feature View that will allow us to serve this # data to our model online. driver_hourly_stats_view = FeatureView( name="driver_hourly_stats", - entities=["driver_id"], + entities=[driver], ttl=Duration(seconds=86400 * 7), schema=[ Field(name="conv_rate", dtype=Float64), Field(name="acc_rate", dtype=Float32), Field(name="avg_daily_trips", dtype=Int64), + Field(name="driver_id", dtype=Int64), ], online=True, - batch_source=driver_hourly_stats, + source=driver_hourly_stats, tags={}, ) @@ -75,11 +76,11 @@ def transformed_conv_rate(features_df: pd.DataFrame) -> pd.DataFrame: benchmark_feature_views = [ FeatureView( name=f"feature_view_{i}", - entities=["entity"], + entities=[entity], ttl=Duration(seconds=86400), schema=[Field(name=f"feature_{10 * i + j}", dtype=Int64) for j in range(10)], online=True, - batch_source=generated_data_source, + source=generated_data_source, ) for i in range(25) ] diff --git a/protos/feast/core/FeatureView.proto b/protos/feast/core/FeatureView.proto index 2662350540..c9e38bf344 100644 --- a/protos/feast/core/FeatureView.proto +++ b/protos/feast/core/FeatureView.proto @@ -35,7 +35,7 @@ message FeatureView { FeatureViewMeta meta = 2; } -// Next available id: 12 +// Next available id: 13 // TODO(adchia): refactor common fields from this and ODFV into separate metadata proto message FeatureViewSpec { // Name of the feature view. Must be unique. Not updated. @@ -44,13 +44,15 @@ message FeatureViewSpec { // Name of Feast project that this feature view belongs to. string project = 2; - // List names of entities to associate with the Features defined in this - // Feature View. Not updatable. + // List of names of entities associated with this feature view. repeated string entities = 3; - // List of specifications for each field defined as part of this feature view. + // List of specifications for each feature defined as part of this feature view. repeated FeatureSpecV2 features = 4; + // List of specifications for each entity defined as part of this feature view. + repeated FeatureSpecV2 entity_columns = 12; + // Description of the feature view. string description = 10; diff --git a/sdk/python/feast/entity.py b/sdk/python/feast/entity.py index 1e7c7cf307..29511adcb2 100644 --- a/sdk/python/feast/entity.py +++ b/sdk/python/feast/entity.py @@ -31,7 +31,7 @@ class Entity: Attributes: name: The unique name of the entity. - value_type: The type of the entity, such as string or float. + value_type (deprecated): The type of the entity, such as string or float. join_key: A property that uniquely identifies different entities within the collection. The join_key property is typically used for joining entities with their associated features. If not specified, defaults to the name. @@ -60,7 +60,7 @@ def __init__( self, *args, name: Optional[str] = None, - value_type: ValueType = ValueType.UNKNOWN, + value_type: Optional[ValueType] = None, description: str = "", join_key: Optional[str] = None, tags: Optional[Dict[str, str]] = None, @@ -72,7 +72,7 @@ def __init__( Args: name: The unique name of the entity. - value_type: The type of the entity, such as string or float. + value_type (deprecated): The type of the entity, such as string or float. description: A human-readable description. join_key (deprecated): A property that uniquely identifies different entities within the collection. The join_key property is typically used for joining entities @@ -104,8 +104,23 @@ def __init__( if not self.name: raise ValueError("Name needs to be specified") - self.value_type = value_type + if value_type: + warnings.warn( + ( + "The `value_type` parameter is being deprecated. Instead, the type of an entity " + "should be specified as a Field in the schema of a feature view. Feast 0.23 and " + "onwards will not support the `value_type` parameter. The `entities` parameter of " + "feature views should also be changed to a List[Entity] instead of a List[str]; if " + "this is not done, entity columns will be mistakenly interpreted as feature columns." + ), + DeprecationWarning, + ) + self.value_type = value_type or ValueType.UNKNOWN + # For now, both the `join_key` and `join_keys` attributes are set correctly, + # so both are usable. + # TODO(felixwang9817): Remove the usage of `join_key` throughout the codebase + # when the usage of `join_key` as a parameter is removed. if join_key: warnings.warn( ( @@ -125,6 +140,8 @@ def __init__( self.join_key = join_keys[0] else: self.join_key = join_key if join_key else self.name + if not self.join_keys: + self.join_keys = [self.join_key] self.description = description self.tags = tags if tags is not None else {} self.owner = owner @@ -153,6 +170,9 @@ def __eq__(self, other): def __str__(self): return str(MessageToJson(self.to_proto())) + def __lt__(self, other): + return self.name < other.name + def is_valid(self): """ Validates the state of this entity locally. @@ -179,13 +199,14 @@ def from_proto(cls, entity_proto: EntityProto): """ entity = cls( name=entity_proto.spec.name, - value_type=ValueType(entity_proto.spec.value_type), join_keys=[entity_proto.spec.join_key], description=entity_proto.spec.description, tags=entity_proto.spec.tags, owner=entity_proto.spec.owner, ) + entity.value_type = ValueType(entity_proto.spec.value_type) + if entity_proto.meta.HasField("created_timestamp"): entity.created_timestamp = entity_proto.meta.created_timestamp.ToDatetime() if entity_proto.meta.HasField("last_updated_timestamp"): diff --git a/sdk/python/feast/feature_logging.py b/sdk/python/feast/feature_logging.py index b2d77f68fe..04f30ab81a 100644 --- a/sdk/python/feast/feature_logging.py +++ b/sdk/python/feast/feature_logging.py @@ -11,11 +11,10 @@ FeatureViewNotFoundException, OnDemandFeatureViewNotFoundException, ) -from feast.feature_view import DUMMY_ENTITY_NAME +from feast.feature_view import DUMMY_ENTITY_ID from feast.protos.feast.core.FeatureService_pb2 import ( LoggingConfig as LoggingConfigProto, ) -from feast.types import from_value_type if TYPE_CHECKING: from feast import FeatureService @@ -77,18 +76,14 @@ def get_schema(self, registry: "Registry") -> pa.Schema: fields[field.name] = FEAST_TYPE_TO_ARROW_TYPE[field.dtype] else: - for entity_name in feature_view.entities: - if entity_name == DUMMY_ENTITY_NAME: + for entity_column in feature_view.entity_columns: + if entity_column.name == DUMMY_ENTITY_ID: continue - entity = registry.get_entity(entity_name, self._project) - join_key = projection.join_key_map.get( - entity.join_key, entity.join_key + entity_column.name, entity_column.name ) - fields[join_key] = FEAST_TYPE_TO_ARROW_TYPE[ - from_value_type(entity.value_type) - ] + fields[join_key] = FEAST_TYPE_TO_ARROW_TYPE[entity_column.dtype] for feature in projection.features: fields[ diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 791f1ae40e..768823a68c 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -66,8 +66,7 @@ ) from feast.inference import ( update_data_sources_with_inferred_event_timestamp_col, - update_entities_with_inferred_types_from_feature_views, - update_feature_views_with_inferred_features, + update_feature_views_with_inferred_features_and_entities, ) from feast.infra.infra_object import Infra from feast.infra.provider import Provider, RetrievalJob, get_provider @@ -256,6 +255,7 @@ def _list_feature_views( ): if hide_dummy_entity and fv.entities[0] == DUMMY_ENTITY_NAME: fv.entities = [] + fv.entity_columns = [] feature_views.append(fv) return feature_views @@ -480,10 +480,6 @@ def _make_inferences( feature_services_to_update: List[FeatureService], ): """Makes inferences for entities, feature views, odfvs, and feature services.""" - update_entities_with_inferred_types_from_feature_views( - entities_to_update, views_to_update, self.config - ) - update_data_sources_with_inferred_event_timestamp_col( data_sources_to_update, self.config ) @@ -494,7 +490,7 @@ def _make_inferences( # New feature views may reference previously applied entities. entities = self._list_entities() - update_feature_views_with_inferred_features( + update_feature_views_with_inferred_features_and_entities( views_to_update, entities + entities_to_update, self.config ) @@ -524,11 +520,11 @@ def _plan( Examples: Generate a plan adding an Entity and a FeatureView. - >>> from feast import FeatureStore, Entity, FeatureView, Feature, ValueType, FileSource, RepoConfig + >>> from feast import FeatureStore, Entity, FeatureView, Feature, FileSource, RepoConfig >>> from feast.feature_store import RepoContents >>> from datetime import timedelta >>> fs = FeatureStore(repo_path="feature_repo") - >>> driver = Entity(name="driver_id", value_type=ValueType.INT64, description="driver id") + >>> driver = Entity(name="driver_id", description="driver id") >>> driver_hourly_stats = FileSource( ... path="feature_repo/data/driver_stats.parquet", ... timestamp_field="event_timestamp", @@ -536,7 +532,7 @@ def _plan( ... ) >>> driver_hourly_stats_view = FeatureView( ... name="driver_hourly_stats", - ... entities=["driver_id"], + ... entities=[driver], ... ttl=timedelta(seconds=86400 * 1), ... batch_source=driver_hourly_stats, ... ) @@ -637,10 +633,10 @@ def apply( Examples: Register an Entity and a FeatureView. - >>> from feast import FeatureStore, Entity, FeatureView, Feature, ValueType, FileSource, RepoConfig + >>> from feast import FeatureStore, Entity, FeatureView, Feature, FileSource, RepoConfig >>> from datetime import timedelta >>> fs = FeatureStore(repo_path="feature_repo") - >>> driver = Entity(name="driver_id", value_type=ValueType.INT64, description="driver id") + >>> driver = Entity(name="driver_id", description="driver id") >>> driver_hourly_stats = FileSource( ... path="feature_repo/data/driver_stats.parquet", ... timestamp_field="event_timestamp", @@ -648,7 +644,7 @@ def apply( ... ) >>> driver_hourly_stats_view = FeatureView( ... name="driver_hourly_stats", - ... entities=["driver_id"], + ... entities=[driver], ... ttl=timedelta(seconds=86400 * 1), ... batch_source=driver_hourly_stats, ... ) @@ -695,6 +691,9 @@ def apply( data_sources_to_update = list(data_sources_set_to_update) + # Handle all entityless feature views by using DUMMY_ENTITY as a placeholder entity. + entities_to_update.append(DUMMY_ENTITY) + # Validate all feature views and make inferences. self._validate_all_feature_views( views_to_update, odfvs_to_update, request_views_to_update @@ -707,9 +706,6 @@ def apply( services_to_update, ) - # Handle all entityless feature views by using DUMMY_ENTITY as a placeholder entity. - entities_to_update.append(DUMMY_ENTITY) - # Add all objects to the registry and update the provider's infrastructure. for ds in data_sources_to_update: self._registry.apply_data_source(ds, project=self.project, commit=False) @@ -1562,12 +1558,12 @@ def _get_columnar_entity_values( def _get_entity_maps( self, feature_views ) -> Tuple[Dict[str, str], Dict[str, ValueType], Set[str]]: + # TODO(felixwang9817): Support entities that have different types for different feature views. entities = self._list_entities(allow_cache=True, hide_dummy_entity=False) entity_name_to_join_key_map: Dict[str, str] = {} entity_type_map: Dict[str, ValueType] = {} for entity in entities: entity_name_to_join_key_map[entity.name] = entity.join_key - entity_type_map[entity.name] = entity.value_type for feature_view in feature_views: for entity_name in feature_view.entities: entity = self._registry.get_entity( @@ -1582,7 +1578,11 @@ def _get_entity_maps( entity.join_key, entity.join_key ) entity_name_to_join_key_map[entity_name] = join_key - entity_type_map[join_key] = entity.value_type + for entity_column in feature_view.entity_columns: + entity_type_map[ + entity_column.name + ] = entity_column.dtype.to_value_type() + return ( entity_name_to_join_key_map, entity_type_map, diff --git a/sdk/python/feast/feature_view.py b/sdk/python/feast/feature_view.py index 6a8fd33217..12ce9105f7 100644 --- a/sdk/python/feast/feature_view.py +++ b/sdk/python/feast/feature_view.py @@ -36,7 +36,6 @@ MaterializationInterval as MaterializationIntervalProto, ) from feast.usage import log_exceptions -from feast.value_type import ValueType warnings.simplefilter("once", DeprecationWarning) @@ -44,9 +43,7 @@ DUMMY_ENTITY_ID = "__dummy_id" DUMMY_ENTITY_NAME = "__dummy" DUMMY_ENTITY_VAL = "" -DUMMY_ENTITY = Entity( - name=DUMMY_ENTITY_NAME, join_keys=[DUMMY_ENTITY_ID], value_type=ValueType.STRING, -) +DUMMY_ENTITY = Entity(name=DUMMY_ENTITY_NAME, join_keys=[DUMMY_ENTITY_ID],) class FeatureView(BaseFeatureView): @@ -55,7 +52,7 @@ class FeatureView(BaseFeatureView): Attributes: name: The unique name of the feature view. - entities: The list of entities with which this group of features is associated. + entities: The list of names of entities that this feature view is associated with. ttl: The amount of time this group of features lives. A ttl of 0 indicates that this group of features lives forever. Note that large ttl's or a ttl of 0 can result in extremely computationally intensive queries. @@ -65,9 +62,11 @@ class FeatureView(BaseFeatureView): stream_source (optional): The stream source of data where this group of features is stored. This is deprecated in favor of `source`. schema: The schema of the feature view, including feature, timestamp, and entity - columns. - features: The list of features defined as part of this feature view. Each - feature should also be included in the schema. + columns. If not specified, can be inferred from the underlying data source. + entity_columns: The list of entity columns contained in the schema. If not specified, + can be inferred from the underlying data source. + features: The list of feature columns contained in the schema. If not specified, + can be inferred from the underlying data source. online: A boolean indicating whether online retrieval is enabled for this feature view. description: A human-readable description. @@ -84,6 +83,7 @@ class FeatureView(BaseFeatureView): batch_source: DataSource stream_source: Optional[DataSource] schema: List[Field] + entity_columns: List[Field] features: List[Field] online: bool description: str @@ -129,14 +129,15 @@ def __init__( owner (optional): The owner of the feature view, typically the email of the primary maintainer. schema (optional): The schema of the feature view, including feature, timestamp, - and entity columns. + and entity columns. If entity columns are included in the schema, a List[Entity] + must be passed to `entities` instead of a List[str]; otherwise, the entity columns + will be mistakenly interpreted as feature columns. source (optional): The source of data for this group of features. May be a stream source, or a batch source. If a stream source, the source should contain a batch_source for backfills & batch materialization. Raises: ValueError: A field mapping conflicts with an Entity or a Feature. """ - positional_attributes = ["name", "entities", "ttl"] _name = name @@ -167,11 +168,21 @@ def __init__( raise ValueError("feature view name needs to be specified") self.name = _name + self.entities = ( [e.name if isinstance(e, Entity) else e for e in _entities] if _entities else [DUMMY_ENTITY_NAME] ) + if _entities and isinstance(_entities[0], str): + warnings.warn( + ( + "The `entities` parameter should be a list of `Entity` objects. " + "Feast 0.23 and onwards will not support passing in a list of " + "strings to define entities." + ), + DeprecationWarning, + ) self._initialize_sources(_name, batch_source, stream_source, source) @@ -206,14 +217,30 @@ def __init__( _schema = [Field.from_feature(feature) for feature in features] self.schema = _schema - # TODO(felixwang9817): Infer which fields in the schema are features, timestamps, - # and entities. For right now we assume that all fields are features, since the - # current `features` parameter only accepts feature columns. - _features = _schema + # If a user has added entity fields to schema, then they should also have switched + # to using a List[Entity], in which case entity and feature columns can be separated + # here. Conversely, if the user is still using a List[str], they must not have added + # added entity fields, in which case we can set the `features` attribute directly + # equal to the schema. + _features: List[Field] = [] + self.entity_columns = [] + if _entities and len(_entities) > 0 and isinstance(_entities[0], str): + _features = _schema + else: + join_keys = [] + if _entities: + for entity in _entities: + if isinstance(entity, Entity): + join_keys += entity.join_keys + + for field in _schema: + if field.name in join_keys: + self.entity_columns.append(field) + else: + _features.append(field) - cols = [entity for entity in self.entities] + [ - field.name for field in _features - ] + # TODO(felixwang9817): Add more robust validation of features. + cols = [field.name for field in _schema] for col in cols: if ( self.batch_source.field_mapping is not None @@ -276,7 +303,6 @@ def __hash__(self): def __copy__(self): fv = FeatureView( name=self.name, - entities=self.entities, ttl=self.ttl, source=self.batch_source, stream_source=self.stream_source, @@ -284,6 +310,13 @@ def __copy__(self): tags=self.tags, online=self.online, ) + + # This is deliberately set outside of the FV initialization to avoid the deprecation warning. + # TODO(felixwang9817): Move this into the FV initialization when the deprecation warning + # is removed. + fv.entities = self.entities + fv.features = copy.copy(self.features) + fv.entity_columns = copy.copy(self.entity_columns) fv.projection = copy.copy(self.projection) fv.entities = self.entities return fv @@ -303,12 +336,17 @@ def __eq__(self, other): or self.online != other.online or self.batch_source != other.batch_source or self.stream_source != other.stream_source - or self.schema != other.schema + or sorted(self.entity_columns) != sorted(other.entity_columns) ): return False return True + @property + def join_keys(self) -> List[str]: + """Returns a list of all the join keys.""" + return [entity.name for entity in self.entity_columns] + def ensure_valid(self): """ Validates the state of this feature view locally. @@ -394,7 +432,8 @@ def to_proto(self) -> FeatureViewProto: spec = FeatureViewSpecProto( name=self.name, entities=self.entities, - features=[field.to_proto() for field in self.schema], + entity_columns=[field.to_proto() for field in self.entity_columns], + features=[field.to_proto() for field in self.features], description=self.description, tags=self.tags, owner=self.owner, @@ -425,11 +464,6 @@ def from_proto(cls, feature_view_proto: FeatureViewProto): ) feature_view = cls( name=feature_view_proto.spec.name, - entities=[entity for entity in feature_view_proto.spec.entities], - schema=[ - Field.from_proto(field_proto) - for field_proto in feature_view_proto.spec.features - ], description=feature_view_proto.spec.description, tags=dict(feature_view_proto.spec.tags), owner=feature_view_proto.spec.owner, @@ -444,6 +478,19 @@ def from_proto(cls, feature_view_proto: FeatureViewProto): if stream_source: feature_view.stream_source = stream_source + # This avoids the deprecation warning. + feature_view.entities = feature_view_proto.spec.entities + + # Instead of passing in a schema, we set the features and entity columns. + feature_view.features = [ + Field.from_proto(field_proto) + for field_proto in feature_view_proto.spec.features + ] + feature_view.entity_columns = [ + Field.from_proto(field_proto) + for field_proto in feature_view_proto.spec.entity_columns + ] + # FeatureViewProjections are not saved in the FeatureView proto. # Create the default projection. feature_view.projection = FeatureViewProjection.from_definition(feature_view) diff --git a/sdk/python/feast/inference.py b/sdk/python/feast/inference.py index 076cbc86ce..aed90c4ac8 100644 --- a/sdk/python/feast/inference.py +++ b/sdk/python/feast/inference.py @@ -1,73 +1,20 @@ import re -from typing import List +from typing import List, Set -from feast import BigQuerySource, Entity, FileSource, RedshiftSource, SnowflakeSource from feast.data_source import DataSource, PushSource, RequestSource +from feast.entity import Entity from feast.errors import RegistryInferenceFailure -from feast.feature_view import FeatureView +from feast.feature_view import DUMMY_ENTITY_ID, DUMMY_ENTITY_NAME, FeatureView from feast.field import Field, from_value_type +from feast.infra.offline_stores.bigquery_source import BigQuerySource +from feast.infra.offline_stores.file_source import FileSource +from feast.infra.offline_stores.redshift_source import RedshiftSource +from feast.infra.offline_stores.snowflake_source import SnowflakeSource from feast.repo_config import RepoConfig +from feast.types import String from feast.value_type import ValueType -def update_entities_with_inferred_types_from_feature_views( - entities: List[Entity], feature_views: List[FeatureView], config: RepoConfig -) -> None: - """ - Infers the types of the entities by examining the schemas of feature view batch sources. - - Args: - entities: The entities to be updated. - feature_views: A list containing feature views associated with the entities. - config: The config for the current feature store. - """ - incomplete_entities = { - entity.name: entity - for entity in entities - if entity.value_type == ValueType.UNKNOWN - } - incomplete_entities_keys = incomplete_entities.keys() - - for view in feature_views: - if not (incomplete_entities_keys & set(view.entities)): - continue # skip if view doesn't contain any entities that need inference - - col_names_and_types = list( - view.batch_source.get_table_column_names_and_types(config) - ) - for entity_name in view.entities: - if entity_name in incomplete_entities: - entity = incomplete_entities[entity_name] - - # get entity information from information extracted from the view batch source - extracted_entity_name_type_pairs = list( - filter(lambda tup: tup[0] == entity.join_key, col_names_and_types,) - ) - if len(extracted_entity_name_type_pairs) == 0: - # Doesn't mention inference error because would also be an error without inferencing - raise ValueError( - f"""No column in the batch source for the {view.name} feature view matches - its entity's name.""" - ) - - inferred_value_type = view.batch_source.source_datatype_to_feast_value_type()( - extracted_entity_name_type_pairs[0][1] - ) - - if ( - entity.value_type != ValueType.UNKNOWN - and entity.value_type != inferred_value_type - ) or (len(extracted_entity_name_type_pairs) > 1): - raise RegistryInferenceFailure( - "Entity", - f"""Entity value_type inference failed for {entity_name} entity. - Multiple viable matches. - """, - ) - - entity.value_type = inferred_value_type - - def update_data_sources_with_inferred_event_timestamp_col( data_sources: List[DataSource], config: RepoConfig ) -> None: @@ -140,77 +87,144 @@ def update_data_sources_with_inferred_event_timestamp_col( ) -def update_feature_views_with_inferred_features( +def update_feature_views_with_inferred_features_and_entities( fvs: List[FeatureView], entities: List[Entity], config: RepoConfig ) -> None: """ - Infers the set of features associated to each FeatureView and updates the FeatureView with those features. - Inference occurs through considering each column of the underlying data source as a feature except columns that are - associated with the data source's timestamp columns and the FeatureView's entity columns. + Infers the features and entities associated with each feature view and updates it in place. + + Columns whose names match a join key of an entity are considered to be entity columns; all + other columns except designated timestamp columns are considered to be feature columns. If + the feature view already has features, feature inference is skipped. Args: fvs: The feature views to be updated. entities: A list containing entities associated with the feature views. config: The config for the current feature store. """ - entity_name_to_join_key_map = {entity.name: entity.join_key for entity in entities} - join_keys = entity_name_to_join_key_map.values() + entity_name_to_entity_map = {e.name: e for e in entities} + entity_name_to_join_keys_map = {e.name: e.join_keys for e in entities} for fv in fvs: - # First drop all Entity fields. Then infer features if necessary. - fv.schema = [field for field in fv.schema if field.name not in join_keys] - fv.features = [field for field in fv.features if field.name not in join_keys] - - if not fv.features: - columns_to_exclude = { - fv.batch_source.timestamp_field, - fv.batch_source.created_timestamp_column, - } | { - entity_name_to_join_key_map[entity_name] for entity_name in fv.entities - } - - if fv.batch_source.timestamp_field in fv.batch_source.field_mapping: - columns_to_exclude.add( - fv.batch_source.field_mapping[fv.batch_source.timestamp_field] - ) + join_keys = set( + [ + join_key + for entity_name in fv.entities + for join_key in entity_name_to_join_keys_map[entity_name] + ] + ) + + # Fields whose names match a join key are considered to be entity columns; all + # other fields are considered to be feature columns. + for field in fv.schema: + if field.name in join_keys: + # Do not override a preexisting field with the same name. + if field.name not in [ + entity_column.name for entity_column in fv.entity_columns + ]: + fv.entity_columns.append(field) + else: + if field.name not in [feature.name for feature in fv.features]: + fv.features.append(field) + + # Since the `value_type` parameter has not yet been fully deprecated for + # entities, we respect the `value_type` attribute if it still exists. + for entity_name in fv.entities: + entity = entity_name_to_entity_map[entity_name] if ( - fv.batch_source.created_timestamp_column - in fv.batch_source.field_mapping + entity.join_key + not in [entity_column.name for entity_column in fv.entity_columns] + and entity.value_type != ValueType.UNKNOWN ): - columns_to_exclude.add( - fv.batch_source.field_mapping[ - fv.batch_source.created_timestamp_column - ] + fv.entity_columns.append( + Field( + name=entity.join_key, dtype=from_value_type(entity.value_type), + ) ) - for ( - col_name, - col_datatype, - ) in fv.batch_source.get_table_column_names_and_types(config): - if col_name not in columns_to_exclude and not re.match( - "^__|__$", - col_name, # double underscores often signal an internal-use column - ): - feature_name = ( - fv.batch_source.field_mapping[col_name] - if col_name in fv.batch_source.field_mapping - else col_name - ) - field = Field( - name=feature_name, - dtype=from_value_type( - fv.batch_source.source_datatype_to_feast_value_type()( - col_datatype - ) - ), - ) - # Note that schema and features are two different attributes of a - # FeatureView, and that features should be present in both. - fv.schema.append(field) - fv.features.append(field) + # Infer a dummy entity column for entityless feature views. + if len(fv.entities) == 1 and fv.entities[0] == DUMMY_ENTITY_NAME: + fv.entity_columns.append(Field(name=DUMMY_ENTITY_ID, dtype=String)) + + # Run inference for entity columns if there are fewer entity fields than expected. + num_expected_join_keys = sum( + [ + len(entity_name_to_join_keys_map[entity_name]) + for entity_name in fv.entities + ] + ) + run_inference_for_entities = len(fv.entity_columns) < num_expected_join_keys + + # Run inference for feature columns if there are no feature fields. + run_inference_for_features = len(fv.features) == 0 + + if run_inference_for_entities or run_inference_for_features: + _infer_features_and_entities( + fv, join_keys, run_inference_for_features, config, + ) if not fv.features: raise RegistryInferenceFailure( "FeatureView", f"Could not infer Features for the FeatureView named {fv.name}.", ) + + +def _infer_features_and_entities( + fv: FeatureView, join_keys: Set[str], run_inference_for_features, config, +) -> None: + """ + Updates the specific feature in place with inferred features and entities. + + Args: + fv: The feature view on which to run inference. + join_keys: The set of join keys for the feature view's entities. + run_inference_for_features: Whether to run inference for features. + config: The config for the current feature store. + """ + columns_to_exclude = { + fv.batch_source.timestamp_field, + fv.batch_source.created_timestamp_column, + } + for column in columns_to_exclude: + if column in fv.batch_source.field_mapping: + columns_to_exclude.remove(column) + columns_to_exclude.add(fv.batch_source.field_mapping[column]) + + table_column_names_and_types = fv.batch_source.get_table_column_names_and_types( + config + ) + + for col_name, col_datatype in table_column_names_and_types: + if col_name in columns_to_exclude: + continue + elif col_name in join_keys: + field = Field( + name=col_name, + dtype=from_value_type( + fv.batch_source.source_datatype_to_feast_value_type()(col_datatype) + ), + ) + if field.name not in [ + entity_column.name for entity_column in fv.entity_columns + ]: + fv.entity_columns.append(field) + elif not re.match( + "^__|__$", col_name + ): # double underscores often signal an internal-use column + if run_inference_for_features: + feature_name = ( + fv.batch_source.field_mapping[col_name] + if col_name in fv.batch_source.field_mapping + else col_name + ) + field = Field( + name=feature_name, + dtype=from_value_type( + fv.batch_source.source_datatype_to_feast_value_type()( + col_datatype + ) + ), + ) + if field.name not in [feature.name for feature in fv.features]: + fv.features.append(field) diff --git a/sdk/python/feast/infra/offline_stores/file.py b/sdk/python/feast/infra/offline_stores/file.py index 7670ad67b7..de1168a86f 100644 --- a/sdk/python/feast/infra/offline_stores/file.py +++ b/sdk/python/feast/infra/offline_stores/file.py @@ -200,10 +200,9 @@ def evaluate_historical_retrieval(): # Build a list of entity columns to join on (from the right table) join_keys = [] - for entity_name in feature_view.entities: - entity = registry.get_entity(entity_name, project) + for entity_column in feature_view.entity_columns: join_key = feature_view.projection.join_key_map.get( - entity.join_key, entity.join_key + entity_column.name, entity_column.name ) join_keys.append(join_key) diff --git a/sdk/python/feast/infra/offline_stores/offline_utils.py b/sdk/python/feast/infra/offline_stores/offline_utils.py index b6c3d300d4..dad0ca5b78 100644 --- a/sdk/python/feast/infra/offline_stores/offline_utils.py +++ b/sdk/python/feast/infra/offline_stores/offline_utils.py @@ -60,11 +60,9 @@ def get_expected_join_keys( ) -> Set[str]: join_keys = set() for feature_view in feature_views: - entities = feature_view.entities - for entity_name in entities: - entity = registry.get_entity(entity_name, project) + for entity_column in feature_view.entity_columns: join_key = feature_view.projection.join_key_map.get( - entity.join_key, entity.join_key + entity_column.name, entity_column.name ) join_keys.add(join_key) return join_keys @@ -114,14 +112,14 @@ def get_feature_view_query_context( query_context = [] for feature_view, features in feature_views_to_feature_map.items(): - join_keys, entity_selections = [], [] - for entity_name in feature_view.entities: - entity = registry.get_entity(entity_name, project) + join_keys: List[str] = [] + entity_selections: List[str] = [] + for entity_column in feature_view.entity_columns: join_key = feature_view.projection.join_key_map.get( - entity.join_key, entity.join_key + entity_column.name, entity_column.name ) join_keys.append(join_key) - entity_selections.append(f"{entity.join_key} AS {join_key}") + entity_selections.append(f"{entity_column.name} AS {join_key}") if isinstance(feature_view.ttl, timedelta): ttl_seconds = int(feature_view.ttl.total_seconds()) diff --git a/sdk/python/feast/infra/online_stores/redis.py b/sdk/python/feast/infra/online_stores/redis.py index 9ceceff0ac..1a8eedb21e 100644 --- a/sdk/python/feast/infra/online_stores/redis.py +++ b/sdk/python/feast/infra/online_stores/redis.py @@ -106,9 +106,9 @@ def update( (usually this happens when the last feature view that was using specific compound key is deleted) and remove all features attached to this "join_keys". """ - join_keys_to_keep = set(tuple(table.entities) for table in tables_to_keep) + join_keys_to_keep = set(tuple(table.join_keys) for table in tables_to_keep) - join_keys_to_delete = set(tuple(table.entities) for table in tables_to_delete) + join_keys_to_delete = set(tuple(table.join_keys) for table in tables_to_delete) for join_keys in join_keys_to_delete - join_keys_to_keep: self.delete_entity_values(config, list(join_keys)) @@ -122,7 +122,7 @@ def teardown( """ We delete the keys in redis for tables/views being removed. """ - join_keys_to_delete = set(tuple(table.entities) for table in tables) + join_keys_to_delete = set(tuple(table.join_keys) for table in tables) for join_keys in join_keys_to_delete: self.delete_entity_values(config, list(join_keys)) diff --git a/sdk/python/feast/infra/passthrough_provider.py b/sdk/python/feast/infra/passthrough_provider.py index b5965c91bf..f01fd9bac6 100644 --- a/sdk/python/feast/infra/passthrough_provider.py +++ b/sdk/python/feast/infra/passthrough_provider.py @@ -171,11 +171,16 @@ def materialize_single_feature_view( if feature_view.batch_source.field_mapping is not None: table = _run_field_mapping(table, feature_view.batch_source.field_mapping) - join_keys = {entity.join_key: entity.value_type for entity in entities} + join_key_to_value_type = { + entity.name: entity.dtype.to_value_type() + for entity in feature_view.entity_columns + } with tqdm_builder(table.num_rows) as pbar: for batch in table.to_batches(DEFAULT_BATCH_SIZE): - rows_to_write = _convert_arrow_to_proto(batch, feature_view, join_keys) + rows_to_write = _convert_arrow_to_proto( + batch, feature_view, join_key_to_value_type + ) self.online_write_batch( self.repo_config, feature_view, diff --git a/sdk/python/feast/infra/provider.py b/sdk/python/feast/infra/provider.py index 7754a58319..cd82b7d416 100644 --- a/sdk/python/feast/infra/provider.py +++ b/sdk/python/feast/infra/provider.py @@ -390,7 +390,7 @@ def _convert_arrow_to_proto( table = table.to_batches()[0] columns = [ - (field.name, field.dtype.to_value_type()) for field in feature_view.schema + (field.name, field.dtype.to_value_type()) for field in feature_view.features ] + list(join_keys.items()) proto_values_by_column = { diff --git a/sdk/python/feast/on_demand_feature_view.py b/sdk/python/feast/on_demand_feature_view.py index f2048e7f5e..6d1eb455ca 100644 --- a/sdk/python/feast/on_demand_feature_view.py +++ b/sdk/python/feast/on_demand_feature_view.py @@ -666,7 +666,7 @@ def decorator(user_function): def feature_view_to_batch_feature_view(fv: FeatureView) -> BatchFeatureView: - return BatchFeatureView( + bfv = BatchFeatureView( name=fv.name, entities=fv.entities, ttl=fv.ttl, @@ -676,3 +676,7 @@ def feature_view_to_batch_feature_view(fv: FeatureView) -> BatchFeatureView: schema=fv.schema, source=fv.source, ) + + bfv.features = copy.copy(fv.features) + bfv.entities = copy.copy(fv.entities) + return bfv diff --git a/sdk/python/feast/templates/aws/driver_repo.py b/sdk/python/feast/templates/aws/driver_repo.py index 8ebe0b6e92..f80f16bb6f 100644 --- a/sdk/python/feast/templates/aws/driver_repo.py +++ b/sdk/python/feast/templates/aws/driver_repo.py @@ -1,6 +1,6 @@ from datetime import timedelta -from feast import Entity, FeatureService, FeatureView, Field, RedshiftSource, ValueType +from feast import Entity, FeatureService, FeatureView, Field, RedshiftSource from feast.types import Float32, Int64 # Define an entity for the driver. Entities can be thought of as primary keys used to @@ -13,8 +13,6 @@ # features can be looked up. The join keys are also used to join feature # tables/views when building feature vectors join_keys=["driver_id"], - # The storage level type for an entity - value_type=ValueType.INT64, ) # Indicates a data source from which feature values can be retrieved. Sources are queried when building training @@ -41,7 +39,7 @@ # The list of entities specifies the keys required for joining or looking # up features from this feature view. The reference provided in this field # correspond to the name of a defined entity (or entities) - entities=["driver"], + entities=[driver], # The timedelta is the maximum age that each feature value may have # relative to its lookup time. For historical features (used in training), # TTL is relative to each timestamp provided in the entity dataframe. diff --git a/sdk/python/feast/templates/gcp/driver_repo.py b/sdk/python/feast/templates/gcp/driver_repo.py index a4517516b5..acb17d5519 100644 --- a/sdk/python/feast/templates/gcp/driver_repo.py +++ b/sdk/python/feast/templates/gcp/driver_repo.py @@ -1,6 +1,6 @@ from datetime import timedelta -from feast import BigQuerySource, Entity, FeatureService, FeatureView, Field, ValueType +from feast import BigQuerySource, Entity, FeatureService, FeatureView, Field from feast.types import Float32, Int64 # Define an entity for the driver. Entities can be thought of as primary keys used to @@ -13,8 +13,6 @@ # features can be looked up. The join keys are also used to join feature # tables/views when building feature vectors join_keys=["driver_id"], - # The storage level type for an entity - value_type=ValueType.INT64, ) # Indicates a data source from which feature values can be retrieved. Sources are queried when building training @@ -39,7 +37,7 @@ # The list of entities specifies the keys required for joining or looking # up features from this feature view. The reference provided in this field # correspond to the name of a defined entity (or entities) - entities=["driver"], + entities=[driver], # The timedelta is the maximum age that each feature value may have # relative to its lookup time. For historical features (used in training), # TTL is relative to each timestamp provided in the entity dataframe. diff --git a/sdk/python/feast/templates/hbase/example.py b/sdk/python/feast/templates/hbase/example.py index 1d441e0e99..b34696185b 100644 --- a/sdk/python/feast/templates/hbase/example.py +++ b/sdk/python/feast/templates/hbase/example.py @@ -2,7 +2,7 @@ from datetime import timedelta -from feast import Entity, FeatureView, Field, FileSource, ValueType +from feast import Entity, FeatureView, Field, FileSource from feast.types import Float32, Int64 # Read data from parquet files. Parquet is convenient for local development mode. For @@ -16,7 +16,7 @@ # Define an entity for the driver. You can think of entity as a primary key used to # fetch features. -driver = Entity(name="driver", join_keys=["driver_id"], value_type=ValueType.INT64,) +driver = Entity(name="driver", join_keys=["driver_id"]) # Our parquet files contain sample data that includes a driver_id column, timestamps and # three feature column. Here we define a Feature View that will allow us to serve this diff --git a/sdk/python/feast/templates/local/example.py b/sdk/python/feast/templates/local/example.py index 7633947e6e..30f9adf189 100644 --- a/sdk/python/feast/templates/local/example.py +++ b/sdk/python/feast/templates/local/example.py @@ -2,7 +2,7 @@ from datetime import timedelta -from feast import Entity, FeatureService, FeatureView, Field, FileSource, ValueType +from feast import Entity, FeatureService, FeatureView, Field, FileSource from feast.types import Float32, Int64 # Read data from parquet files. Parquet is convenient for local development mode. For @@ -16,14 +16,14 @@ # Define an entity for the driver. You can think of entity as a primary key used to # fetch features. -driver = Entity(name="driver", join_keys=["driver_id"], value_type=ValueType.INT64,) +driver = Entity(name="driver", join_keys=["driver_id"]) # Our parquet files contain sample data that includes a driver_id column, timestamps and # three feature column. Here we define a Feature View that will allow us to serve this # data to our model online. driver_hourly_stats_view = FeatureView( name="driver_hourly_stats", - entities=["driver"], + entities=[driver], ttl=timedelta(days=1), schema=[ Field(name="conv_rate", dtype=Float32), diff --git a/sdk/python/feast/templates/postgres/driver_repo.py b/sdk/python/feast/templates/postgres/driver_repo.py index 34bc0022e2..4096943bb7 100644 --- a/sdk/python/feast/templates/postgres/driver_repo.py +++ b/sdk/python/feast/templates/postgres/driver_repo.py @@ -18,7 +18,7 @@ driver_stats_fv = FeatureView( name="driver_hourly_stats", - entities=["driver_id"], + entities=[driver], ttl=timedelta(weeks=52), schema=[ Field(name="conv_rate", dtype=Float32), diff --git a/sdk/python/feast/templates/snowflake/driver_repo.py b/sdk/python/feast/templates/snowflake/driver_repo.py index ceaf0ba8de..297a3f5ef0 100644 --- a/sdk/python/feast/templates/snowflake/driver_repo.py +++ b/sdk/python/feast/templates/snowflake/driver_repo.py @@ -43,7 +43,7 @@ # The list of entities specifies the keys required for joining or looking # up features from this feature view. The reference provided in this field # correspond to the name of a defined entity (or entities) - entities=["driver"], + entities=[driver], # The timedelta is the maximum age that each feature value may have # relative to its lookup time. For historical features (used in training), # TTL is relative to each timestamp provided in the entity dataframe. diff --git a/sdk/python/feast/templates/spark/example.py b/sdk/python/feast/templates/spark/example.py index da334dd83c..d006353118 100644 --- a/sdk/python/feast/templates/spark/example.py +++ b/sdk/python/feast/templates/spark/example.py @@ -5,7 +5,7 @@ from datetime import timedelta from pathlib import Path -from feast import Entity, FeatureService, FeatureView, Field, ValueType +from feast import Entity, FeatureService, FeatureView, Field from feast.infra.offline_stores.contrib.spark_offline_store.spark_source import ( SparkSource, ) @@ -16,10 +16,8 @@ # Entity definitions -driver = Entity(name="driver", value_type=ValueType.INT64, description="driver id",) -customer = Entity( - name="customer", value_type=ValueType.INT64, description="customer id", -) +driver = Entity(name="driver", description="driver id",) +customer = Entity(name="customer", description="customer id",) # Sources driver_hourly_stats = SparkSource( @@ -40,7 +38,7 @@ # Feature Views driver_hourly_stats_view = FeatureView( name="driver_hourly_stats", - entities=["driver"], + entities=[driver], ttl=timedelta(days=7), schema=[ Field(name="conv_rate", dtype=Float32), @@ -53,7 +51,7 @@ ) customer_daily_profile_view = FeatureView( name="customer_daily_profile", - entities=["customer"], + entities=[customer], ttl=timedelta(days=7), schema=[ Field(name="current_balance", dtype=Float32), diff --git a/sdk/python/pytest.ini b/sdk/python/pytest.ini new file mode 100644 index 0000000000..07a5e869dc --- /dev/null +++ b/sdk/python/pytest.ini @@ -0,0 +1,4 @@ +[pytest] +markers = + universal_offline_stores: mark a test as using all offline stores. + universal_online_stores: mark a test as using all online stores. \ No newline at end of file diff --git a/sdk/python/tests/data/data_creator.py b/sdk/python/tests/data/data_creator.py index e08597b67b..186c39b9ef 100644 --- a/sdk/python/tests/data/data_creator.py +++ b/sdk/python/tests/data/data_creator.py @@ -4,11 +4,11 @@ import pandas as pd from pytz import timezone, utc -from feast.value_type import ValueType +from feast.types import FeastType, Float32, Int32, Int64, String def create_dataset( - entity_type: ValueType = ValueType.INT32, + entity_type: FeastType = Int32, feature_dtype: str = None, feature_is_list: bool = False, list_has_empty_list: bool = False, @@ -16,7 +16,7 @@ def create_dataset( now = datetime.utcnow().replace(microsecond=0, second=0, minute=0) ts = pd.Timestamp(now).round("ms") data = { - "driver_id": get_entities_for_value_type(entity_type), + "driver_id": get_entities_for_feast_type(entity_type), "value": get_feature_values_for_dtype( feature_dtype, feature_is_list, list_has_empty_list ), @@ -37,14 +37,14 @@ def create_dataset( return pd.DataFrame.from_dict(data) -def get_entities_for_value_type(value_type: ValueType) -> List: - value_type_map: Dict[ValueType, List] = { - ValueType.INT32: [1, 2, 1, 3, 3], - ValueType.INT64: [1, 2, 1, 3, 3], - ValueType.FLOAT: [1.0, 2.0, 1.0, 3.0, 3.0], - ValueType.STRING: ["1", "2", "1", "3", "3"], +def get_entities_for_feast_type(feast_type: FeastType) -> List: + feast_type_map: Dict[FeastType, List] = { + Int32: [1, 2, 1, 3, 3], + Int64: [1, 2, 1, 3, 3], + Float32: [1.0, 2.0, 1.0, 3.0, 3.0], + String: ["1", "2", "1", "3", "3"], } - return value_type_map[value_type] + return feast_type_map[feast_type] def get_feature_values_for_dtype( diff --git a/sdk/python/tests/doctest/test_all.py b/sdk/python/tests/doctest/test_all.py index 65d5f3da28..31f181ad53 100644 --- a/sdk/python/tests/doctest/test_all.py +++ b/sdk/python/tests/doctest/test_all.py @@ -11,15 +11,13 @@ def setup_feature_store(): """Prepares the local environment for a FeatureStore docstring test.""" from datetime import datetime, timedelta - from feast import Entity, FeatureStore, FeatureView, Field, FileSource, ValueType + from feast import Entity, FeatureStore, FeatureView, Field, FileSource from feast.repo_operations import init_repo from feast.types import Float32, Int64 init_repo("feature_repo", "local") fs = FeatureStore(repo_path="feature_repo") - driver = Entity( - name="driver_id", value_type=ValueType.INT64, description="driver id", - ) + driver = Entity(name="driver_id", description="driver id",) driver_hourly_stats = FileSource( path="feature_repo/data/driver_stats.parquet", timestamp_field="event_timestamp", @@ -27,7 +25,7 @@ def setup_feature_store(): ) driver_hourly_stats_view = FeatureView( name="driver_hourly_stats", - entities=["driver_id"], + entities=[driver], ttl=timedelta(seconds=86400 * 1), schema=[ Field(name="conv_rate", dtype=Float32), diff --git a/sdk/python/tests/example_repos/example_feature_repo_1.py b/sdk/python/tests/example_repos/example_feature_repo_1.py index d8b6d7c89b..8d6d96d9ef 100644 --- a/sdk/python/tests/example_repos/example_feature_repo_1.py +++ b/sdk/python/tests/example_repos/example_feature_repo_1.py @@ -1,14 +1,6 @@ from datetime import timedelta -from feast import ( - BigQuerySource, - Entity, - FeatureService, - FeatureView, - Field, - PushSource, - ValueType, -) +from feast import BigQuerySource, Entity, FeatureService, FeatureView, Field, PushSource from feast.types import Float32, Int64, String driver_locations_source = BigQuerySource( @@ -46,22 +38,24 @@ driver = Entity( name="driver", # The name is derived from this argument, not object name. join_keys=["driver_id"], - value_type=ValueType.INT64, description="driver id", ) customer = Entity( name="customer", # The name is derived from this argument, not object name. join_keys=["customer_id"], - value_type=ValueType.STRING, ) driver_locations = FeatureView( name="driver_locations", - entities=["driver"], + entities=[driver], ttl=timedelta(days=1), - schema=[Field(name="lat", dtype=Float32), Field(name="lon", dtype=String)], + schema=[ + Field(name="lat", dtype=Float32), + Field(name="lon", dtype=String), + Field(name="driver_id", dtype=Int64), + ], online=True, batch_source=driver_locations_source, tags={}, @@ -69,11 +63,12 @@ pushed_driver_locations = FeatureView( name="pushed_driver_locations", - entities=["driver"], + entities=[driver], ttl=timedelta(days=1), schema=[ Field(name="driver_lat", dtype=Float32), Field(name="driver_long", dtype=String), + Field(name="driver_id", dtype=Int64), ], online=True, stream_source=driver_locations_push_source, @@ -82,12 +77,13 @@ customer_profile = FeatureView( name="customer_profile", - entities=["customer"], + entities=[customer], ttl=timedelta(days=1), schema=[ Field(name="avg_orders_day", dtype=Float32), Field(name="name", dtype=String), Field(name="age", dtype=Int64), + Field(name="customer_id", dtype=String), ], online=True, batch_source=customer_profile_source, @@ -96,9 +92,13 @@ customer_driver_combined = FeatureView( name="customer_driver_combined", - entities=["customer", "driver"], + entities=[customer, driver], ttl=timedelta(days=1), - schema=[Field(name="trips", dtype=Int64)], + schema=[ + Field(name="trips", dtype=Int64), + Field(name="driver_id", dtype=Int64), + Field(name="customer_id", dtype=String), + ], online=True, batch_source=customer_driver_combined_source, tags={}, diff --git a/sdk/python/tests/example_repos/example_feature_repo_2.py b/sdk/python/tests/example_repos/example_feature_repo_2.py index d4c7976418..073c48c1c1 100644 --- a/sdk/python/tests/example_repos/example_feature_repo_2.py +++ b/sdk/python/tests/example_repos/example_feature_repo_2.py @@ -1,6 +1,6 @@ from datetime import timedelta -from feast import Entity, FeatureView, Field, FileSource, ValueType +from feast import Entity, FeatureView, Field, FileSource from feast.types import Float32, Int32, Int64 driver_hourly_stats = FileSource( @@ -9,17 +9,18 @@ created_timestamp_column="created", ) -driver = Entity(name="driver_id", value_type=ValueType.INT64, description="driver id") +driver = Entity(name="driver_id", description="driver id",) driver_hourly_stats_view = FeatureView( name="driver_hourly_stats", - entities=["driver_id"], + entities=[driver], ttl=timedelta(days=1), schema=[ Field(name="conv_rate", dtype=Float32), Field(name="acc_rate", dtype=Float32), Field(name="avg_daily_trips", dtype=Int64), + Field(name="driver_id", dtype=Int32), ], online=True, source=driver_hourly_stats, diff --git a/sdk/python/tests/example_repos/example_feature_repo_with_duplicated_featureview_names.py b/sdk/python/tests/example_repos/example_feature_repo_with_duplicated_featureview_names.py index cbcc3ad172..4b079999ed 100644 --- a/sdk/python/tests/example_repos/example_feature_repo_with_duplicated_featureview_names.py +++ b/sdk/python/tests/example_repos/example_feature_repo_with_duplicated_featureview_names.py @@ -1,14 +1,16 @@ from datetime import timedelta -from feast import FeatureView, FileSource +from feast import Entity, FeatureView, FileSource driver_hourly_stats = FileSource( path="driver_stats.parquet", # this parquet is not real and will not be read ) +driver = Entity(name="driver_id", description="driver id", join_keys=["driver"],) + driver_hourly_stats_view = FeatureView( name="driver_hourly_stats", # Intentionally use the same FeatureView name - entities=["driver_id"], + entities=[driver], online=False, source=driver_hourly_stats, ttl=timedelta(days=1), @@ -17,7 +19,7 @@ driver_hourly_stats_view_dup1 = FeatureView( name="driver_hourly_stats", # Intentionally use the same FeatureView name - entities=["driver_id"], + entities=[driver], online=False, source=driver_hourly_stats, ttl=timedelta(days=1), diff --git a/sdk/python/tests/example_repos/example_feature_repo_with_entity_join_key.py b/sdk/python/tests/example_repos/example_feature_repo_with_entity_join_key.py index 5ba26d2573..0663150531 100644 --- a/sdk/python/tests/example_repos/example_feature_repo_with_entity_join_key.py +++ b/sdk/python/tests/example_repos/example_feature_repo_with_entity_join_key.py @@ -1,6 +1,6 @@ from datetime import timedelta -from feast import Entity, FeatureView, Field, FileSource, ValueType +from feast import Entity, FeatureView, Field, FileSource from feast.types import Float32, Int64 driver_hourly_stats = FileSource( @@ -11,17 +11,12 @@ # The join key here is deliberately different from the parquet file to test the failure path. -driver = Entity( - name="driver_id", - value_type=ValueType.INT64, - description="driver id", - join_keys=["driver"], -) +driver = Entity(name="driver_id", description="driver id", join_keys=["driver"],) driver_hourly_stats_view = FeatureView( name="driver_hourly_stats", - entities=["driver_id"], + entities=[driver], ttl=timedelta(days=1), schema=[ Field(name="conv_rate", dtype=Float32), diff --git a/sdk/python/tests/example_repos/on_demand_feature_view_repo.py b/sdk/python/tests/example_repos/on_demand_feature_view_repo.py index 453158b9dc..ac572d5747 100644 --- a/sdk/python/tests/example_repos/on_demand_feature_view_repo.py +++ b/sdk/python/tests/example_repos/on_demand_feature_view_repo.py @@ -2,7 +2,7 @@ import pandas as pd -from feast import FeatureView, Field, FileSource +from feast import Entity, FeatureView, Field, FileSource from feast.on_demand_feature_view import on_demand_feature_view from feast.types import Float32, String @@ -15,15 +15,18 @@ owner="test2@gmail.com", ) +driver = Entity(name="driver_id", description="driver id",) + driver_daily_features_view = FeatureView( name="driver_daily_features", - entities=["driver"], + entities=[driver], ttl=timedelta(seconds=8640000000), schema=[ Field(name="daily_miles_driven", dtype=Float32), Field(name="lat", dtype=Float32), Field(name="lon", dtype=Float32), Field(name="string_feature", dtype=String), + Field(name="driver_id", dtype=Float32), ], online=True, source=driver_stats, diff --git a/sdk/python/tests/integration/e2e/test_usage_e2e.py b/sdk/python/tests/integration/e2e/test_usage_e2e.py index 12c1eb8628..53e4a32a82 100644 --- a/sdk/python/tests/integration/e2e/test_usage_e2e.py +++ b/sdk/python/tests/integration/e2e/test_usage_e2e.py @@ -19,7 +19,7 @@ import pytest -from feast import Entity, RepoConfig, ValueType +from feast import Entity, RepoConfig from feast.infra.online_stores.sqlite import SqliteOnlineStoreConfig @@ -60,7 +60,6 @@ def test_usage_on(dummy_exporter, enabling_toggle): entity = Entity( name="driver_car_id", description="Car driver id", - value_type=ValueType.STRING, tags={"team": "matchmaking"}, ) @@ -99,7 +98,6 @@ def test_usage_off(dummy_exporter, enabling_toggle): entity = Entity( name="driver_car_id", description="Car driver id", - value_type=ValueType.STRING, tags={"team": "matchmaking"}, ) test_feature_store.apply([entity]) diff --git a/sdk/python/tests/integration/feature_repos/universal/entities.py b/sdk/python/tests/integration/feature_repos/universal/entities.py index b7a7583f1b..66989b0646 100644 --- a/sdk/python/tests/integration/feature_repos/universal/entities.py +++ b/sdk/python/tests/integration/feature_repos/universal/entities.py @@ -1,22 +1,21 @@ -from feast import Entity, ValueType +from feast import Entity -def driver(value_type: ValueType = ValueType.INT64): +def driver(): return Entity( name="driver", # The name is derived from this argument, not object name. - value_type=value_type, description="driver id", join_keys=["driver_id"], ) def customer(): - return Entity(name="customer_id", value_type=ValueType.INT64) + return Entity(name="customer_id") def location(): - return Entity(name="location_id", value_type=ValueType.INT64) + return Entity(name="location_id") def item(): - return Entity(name="item_id", value_type=ValueType.INT64) + return Entity(name="item_id") diff --git a/sdk/python/tests/integration/feature_repos/universal/feature_views.py b/sdk/python/tests/integration/feature_repos/universal/feature_views.py index 3e05f5d7e5..b93ad987fa 100644 --- a/sdk/python/tests/integration/feature_repos/universal/feature_views.py +++ b/sdk/python/tests/integration/feature_repos/universal/feature_views.py @@ -14,8 +14,13 @@ ValueType, ) from feast.data_source import DataSource, RequestSource -from feast.types import Array, FeastType, Float32, Float64, Int32 -from tests.integration.feature_repos.universal.entities import location +from feast.types import Array, FeastType, Float32, Float64, Int32, Int64 +from tests.integration.feature_repos.universal.entities import ( + customer, + driver, + item, + location, +) def driver_feature_view( @@ -23,12 +28,14 @@ def driver_feature_view( name="test_correctness", infer_features: bool = False, dtype: FeastType = Float32, - entities: Optional[List[str]] = None, + entity_type: FeastType = Int64, ) -> FeatureView: + d = driver() return FeatureView( name=name, - entities=entities or ["driver"], - schema=None if infer_features else [Field(name="value", dtype=dtype)], + entities=[d], + schema=[Field(name=d.join_key, dtype=entity_type)] + + ([] if infer_features else [Field(name="value", dtype=dtype)]), ttl=timedelta(days=5), source=data_source, ) @@ -139,7 +146,7 @@ def create_similarity_request_source(): def create_item_embeddings_feature_view(source, infer_features: bool = False): item_embeddings_feature_view = FeatureView( name="item_embeddings", - entities=["item"], + entities=[item()], schema=None if infer_features else [ @@ -157,7 +164,7 @@ def create_item_embeddings_batch_feature_view( ) -> BatchFeatureView: item_embeddings_feature_view = BatchFeatureView( name="item_embeddings", - entities=["item"], + entities=[item()], schema=None if infer_features else [ @@ -171,15 +178,19 @@ def create_item_embeddings_batch_feature_view( def create_driver_hourly_stats_feature_view(source, infer_features: bool = False): + # TODO(felixwang9817): Figure out why not adding an entity field here + # breaks type tests. + d = driver() driver_stats_feature_view = FeatureView( name="driver_stats", - entities=["driver"], + entities=[d], schema=None if infer_features else [ Field(name="conv_rate", dtype=Float32), Field(name="acc_rate", dtype=Float32), Field(name="avg_daily_trips", dtype=Int32), + Field(name=d.join_key, dtype=Int64), ], source=source, ttl=timedelta(hours=2), @@ -192,7 +203,7 @@ def create_driver_hourly_stats_batch_feature_view( ) -> BatchFeatureView: driver_stats_feature_view = BatchFeatureView( name="driver_stats", - entities=["driver"], + entities=[driver()], schema=None if infer_features else [ @@ -209,7 +220,7 @@ def create_driver_hourly_stats_batch_feature_view( def create_customer_daily_profile_feature_view(source, infer_features: bool = False): customer_profile_feature_view = FeatureView( name="customer_profile", - entities=["customer_id"], + entities=[customer()], schema=None if infer_features else [ @@ -243,10 +254,13 @@ def create_global_stats_feature_view(source, infer_features: bool = False): def create_order_feature_view(source, infer_features: bool = False): return FeatureView( name="order", - entities=["driver", "customer_id"], + entities=[customer(), driver()], schema=None if infer_features - else [Field(name="order_is_success", dtype=Int32)], + else [ + Field(name="order_is_success", dtype=Int32), + Field(name="driver_id", dtype=Int64), + ], source=source, ttl=timedelta(days=2), ) @@ -256,7 +270,12 @@ def create_location_stats_feature_view(source, infer_features: bool = False): location_stats_feature_view = FeatureView( name="location_stats", entities=[location()], - schema=None if infer_features else [Field(name="temperature", dtype=Int32)], + schema=None + if infer_features + else [ + Field(name="temperature", dtype=Int32), + Field(name="location_id", dtype=Int64), + ], source=source, ttl=timedelta(days=2), ) @@ -280,9 +299,11 @@ def create_pushable_feature_view(batch_source: DataSource): ) return FeatureView( name="pushable_location_stats", - entities=["location_id"], - # Test that Features still work for FeatureViews. - features=[Feature(name="temperature", dtype=ValueType.INT32)], + entities=[location()], + schema=[ + Field(name="temperature", dtype=Int32), + Field(name="location_id", dtype=Int64), + ], ttl=timedelta(days=2), source=push_source, ) diff --git a/sdk/python/tests/integration/offline_store/test_feature_logging.py b/sdk/python/tests/integration/offline_store/test_feature_logging.py index 24c65eac02..a6f8e56de7 100644 --- a/sdk/python/tests/integration/offline_store/test_feature_logging.py +++ b/sdk/python/tests/integration/offline_store/test_feature_logging.py @@ -16,7 +16,11 @@ from tests.integration.feature_repos.repo_configuration import ( construct_universal_feature_views, ) -from tests.integration.feature_repos.universal.entities import driver +from tests.integration.feature_repos.universal.entities import ( + customer, + driver, + location, +) from tests.integration.feature_repos.universal.feature_views import conv_rate_plus_100 from tests.utils.logged_features import prepare_logs, to_logs_dataset @@ -30,7 +34,7 @@ def test_feature_service_logging(environment, universal_data_sources, pass_as_pa (_, datasets, data_sources) = universal_data_sources feature_views = construct_universal_feature_views(data_sources) - store.apply([driver(), *feature_views.values()]) + store.apply([customer(), driver(), location(), *feature_views.values()]) feature_service = FeatureService( name="test_service", diff --git a/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py b/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py index d5f49a1f95..2076ab2aed 100644 --- a/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py +++ b/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py @@ -22,7 +22,6 @@ DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL, ) from feast.types import Float32, Int32 -from feast.value_type import ValueType from tests.integration.feature_repos.repo_configuration import ( construct_universal_feature_views, table_name_from_data_source, @@ -728,10 +727,10 @@ def test_historical_features_from_bigquery_sources_containing_backfills(environm created_timestamp_column="created", ) - driver = Entity(name="driver", join_keys=["driver_id"], value_type=ValueType.INT64) + driver = Entity(name="driver", join_keys=["driver_id"]) driver_fv = FeatureView( name="driver_stats", - entities=["driver"], + entities=[driver], schema=[Field(name="avg_daily_trips", dtype=Int32)], batch_source=driver_stats_data_source, ttl=None, diff --git a/sdk/python/tests/integration/online_store/test_universal_online.py b/sdk/python/tests/integration/online_store/test_universal_online.py index b3115dcb3d..4afcd61c70 100644 --- a/sdk/python/tests/integration/online_store/test_universal_online.py +++ b/sdk/python/tests/integration/online_store/test_universal_online.py @@ -13,7 +13,7 @@ import requests from botocore.exceptions import BotoCoreError -from feast import Entity, FeatureService, FeatureView, Field, ValueType +from feast import Entity, FeatureService, FeatureView, Field from feast.errors import ( FeatureNameCollisionError, RequestDataNotFoundInEntityRowsException, @@ -117,13 +117,13 @@ def test_write_to_online_store_event_check(environment): } dataframe_source = pd.DataFrame(data) with prep_file_source(df=dataframe_source, timestamp_field="ts_1") as file_source: - e = Entity(name="id", value_type=ValueType.STRING) + e = Entity(name="id") # Create Feature View fv1 = FeatureView( name="feature_view_123", schema=[Field(name="string_col", dtype=String)], - entities=["id"], + entities=[e], batch_source=file_source, ttl=timedelta(minutes=5), ) diff --git a/sdk/python/tests/integration/registration/test_feature_store.py b/sdk/python/tests/integration/registration/test_feature_store.py index db4c6700ce..88a4b9f249 100644 --- a/sdk/python/tests/integration/registration/test_feature_store.py +++ b/sdk/python/tests/integration/registration/test_feature_store.py @@ -27,10 +27,8 @@ from feast.infra.offline_stores.file import FileOfflineStoreConfig from feast.infra.online_stores.dynamodb import DynamoDBOnlineStoreConfig from feast.infra.online_stores.sqlite import SqliteOnlineStoreConfig -from feast.protos.feast.types import Value_pb2 as ValueProto from feast.repo_config import RepoConfig from feast.types import Array, Bytes, Float64, Int64, String -from feast.value_type import ValueType from tests.utils.data_source_utils import ( prep_file_source, simple_bq_source_using_query_arg, @@ -93,10 +91,7 @@ def feature_store_with_s3_registry(): ) def test_apply_entity_success(test_feature_store): entity = Entity( - name="driver_car_id", - description="Car driver id", - value_type=ValueType.STRING, - tags={"team": "matchmaking"}, + name="driver_car_id", description="Car driver id", tags={"team": "matchmaking"}, ) # Register Entity @@ -108,7 +103,6 @@ def test_apply_entity_success(test_feature_store): assert ( len(entities) == 1 and entity.name == "driver_car_id" - and entity.value_type == ValueType(ValueProto.ValueType.STRING) and entity.description == "Car driver id" and "team" in entity.tags and entity.tags["team"] == "matchmaking" @@ -127,10 +121,7 @@ def test_apply_entity_success(test_feature_store): ) def test_apply_entity_integration(test_feature_store): entity = Entity( - name="driver_car_id", - description="Car driver id", - value_type=ValueType.STRING, - tags={"team": "matchmaking"}, + name="driver_car_id", description="Car driver id", tags={"team": "matchmaking"}, ) # Register Entity @@ -142,7 +133,6 @@ def test_apply_entity_integration(test_feature_store): assert ( len(entities) == 1 and entity.name == "driver_car_id" - and entity.value_type == ValueType(ValueProto.ValueType.STRING) and entity.description == "Car driver id" and "team" in entity.tags and entity.tags["team"] == "matchmaking" @@ -151,7 +141,6 @@ def test_apply_entity_integration(test_feature_store): entity = test_feature_store.get_entity("driver_car_id") assert ( entity.name == "driver_car_id" - and entity.value_type == ValueType(ValueProto.ValueType.STRING) and entity.description == "Car driver id" and "team" in entity.tags and entity.tags["team"] == "matchmaking" @@ -173,6 +162,8 @@ def test_apply_feature_view_success(test_feature_store): date_partition_column="date_partition_col", ) + entity = Entity(name="fs1_my_entity_1", join_keys=["entity_id"]) + fv1 = FeatureView( name="my_feature_view_1", schema=[ @@ -180,15 +171,16 @@ def test_apply_feature_view_success(test_feature_store): Field(name="fs1_my_feature_2", dtype=String), Field(name="fs1_my_feature_3", dtype=Array(String)), Field(name="fs1_my_feature_4", dtype=Array(Bytes)), + Field(name="entity_id", dtype=Int64), ], - entities=["fs1_my_entity_1"], + entities=[entity], tags={"team": "matchmaking"}, batch_source=batch_source, ttl=timedelta(minutes=5), ) # Register Feature View - test_feature_store.apply([fv1]) + test_feature_store.apply([entity, fv1]) feature_views = test_feature_store.list_feature_views() @@ -217,13 +209,11 @@ def test_apply_feature_view_success(test_feature_store): @pytest.mark.parametrize("dataframe_source", [lazy_fixture("simple_dataset_1")]) def test_feature_view_inference_success(test_feature_store, dataframe_source): with prep_file_source(df=dataframe_source, timestamp_field="ts_1") as file_source: - entity = Entity( - name="id", join_keys=["id_join_key"], value_type=ValueType.INT64 - ) + entity = Entity(name="id", join_keys=["id_join_key"]) fv1 = FeatureView( name="fv1", - entities=["id"], + entities=[entity], ttl=timedelta(minutes=5), online=True, batch_source=file_source, @@ -232,7 +222,7 @@ def test_feature_view_inference_success(test_feature_store, dataframe_source): fv2 = FeatureView( name="fv2", - entities=["id"], + entities=[entity], ttl=timedelta(minutes=5), online=True, batch_source=simple_bq_source_using_table_arg(dataframe_source, "ts_1"), @@ -241,7 +231,7 @@ def test_feature_view_inference_success(test_feature_store, dataframe_source): fv3 = FeatureView( name="fv3", - entities=["id"], + entities=[entity], ttl=timedelta(minutes=5), online=True, batch_source=simple_bq_source_using_query_arg(dataframe_source, "ts_1"), @@ -296,6 +286,8 @@ def test_apply_feature_view_integration(test_feature_store): date_partition_column="date_partition_col", ) + entity = Entity(name="fs1_my_entity_1", join_keys=["test"]) + fv1 = FeatureView( name="my_feature_view_1", schema=[ @@ -303,15 +295,16 @@ def test_apply_feature_view_integration(test_feature_store): Field(name="fs1_my_feature_2", dtype=String), Field(name="fs1_my_feature_3", dtype=Array(String)), Field(name="fs1_my_feature_4", dtype=Array(Bytes)), + Field(name="test", dtype=Int64), ], - entities=["fs1_my_entity_1"], + entities=[entity], tags={"team": "matchmaking"}, batch_source=batch_source, ttl=timedelta(minutes=5), ) # Register Feature View - test_feature_store.apply([fv1]) + test_feature_store.apply([fv1, entity]) feature_views = test_feature_store.list_feature_views() @@ -364,13 +357,9 @@ def test_apply_object_and_read(test_feature_store): created_timestamp_column="timestamp", ) - e1 = Entity( - name="fs1_my_entity_1", value_type=ValueType.STRING, description="something" - ) + e1 = Entity(name="fs1_my_entity_1", description="something") - e2 = Entity( - name="fs1_my_entity_2", value_type=ValueType.STRING, description="something" - ) + e2 = Entity(name="fs1_my_entity_2", description="something") fv1 = FeatureView( name="my_feature_view_1", @@ -379,8 +368,9 @@ def test_apply_object_and_read(test_feature_store): Field(name="fs1_my_feature_2", dtype=String), Field(name="fs1_my_feature_3", dtype=Array(String)), Field(name="fs1_my_feature_4", dtype=Array(Bytes)), + Field(name="fs1_my_entity_1", dtype=Int64), ], - entities=["fs1_my_entity_1"], + entities=[e1], tags={"team": "matchmaking"}, batch_source=batch_source, ttl=timedelta(minutes=5), @@ -393,8 +383,9 @@ def test_apply_object_and_read(test_feature_store): Field(name="fs1_my_feature_2", dtype=String), Field(name="fs1_my_feature_3", dtype=Array(String)), Field(name="fs1_my_feature_4", dtype=Array(Bytes)), + Field(name="fs1_my_entity_2", dtype=Int64), ], - entities=["fs1_my_entity_1"], + entities=[e2], tags={"team": "matchmaking"}, batch_source=batch_source, ttl=timedelta(minutes=5), @@ -406,7 +397,6 @@ def test_apply_object_and_read(test_feature_store): fv1_actual = test_feature_store.get_feature_view("my_feature_view_1") e1_actual = test_feature_store.get_entity("fs1_my_entity_1") - assert fv1 == fv1_actual assert e1 == e1_actual assert fv2 != fv1_actual assert e2 != e1_actual @@ -434,13 +424,13 @@ def test_apply_remote_repo(): def test_reapply_feature_view_success(test_feature_store, dataframe_source): with prep_file_source(df=dataframe_source, timestamp_field="ts_1") as file_source: - e = Entity(name="id", join_keys=["id_join_key"], value_type=ValueType.STRING) + e = Entity(name="id", join_keys=["id_join_key"]) # Create Feature View fv1 = FeatureView( name="my_feature_view_1", schema=[Field(name="string_col", dtype=String)], - entities=["id"], + entities=[e], batch_source=file_source, ttl=timedelta(minutes=5), ) @@ -470,7 +460,7 @@ def test_reapply_feature_view_success(test_feature_store, dataframe_source): fv1 = FeatureView( name="my_feature_view_1", schema=[Field(name="int64_col", dtype=Int64)], - entities=["id"], + entities=[e], batch_source=file_source, ttl=timedelta(minutes=5), ) @@ -485,10 +475,12 @@ def test_reapply_feature_view_success(test_feature_store, dataframe_source): def test_apply_conflicting_featureview_names(feature_store_with_local_registry): """Test applying feature views with non-case-insensitively unique names""" + driver = Entity(name="driver", join_keys=["driver_id"]) + customer = Entity(name="customer", join_keys=["customer_id"]) driver_stats = FeatureView( name="driver_hourly_stats", - entities=["driver_id"], + entities=[driver], ttl=timedelta(seconds=10), online=False, batch_source=FileSource(path="driver_stats.parquet"), @@ -497,7 +489,7 @@ def test_apply_conflicting_featureview_names(feature_store_with_local_registry): customer_stats = FeatureView( name="DRIVER_HOURLY_STATS", - entities=["id"], + entities=[customer], ttl=timedelta(seconds=10), online=False, batch_source=FileSource(path="customer_stats.parquet"), diff --git a/sdk/python/tests/integration/registration/test_inference.py b/sdk/python/tests/integration/registration/test_inference.py index 6cf49c31db..c298c0e4f6 100644 --- a/sdk/python/tests/integration/registration/test_inference.py +++ b/sdk/python/tests/integration/registration/test_inference.py @@ -24,14 +24,13 @@ from feast.field import Field from feast.inference import ( update_data_sources_with_inferred_event_timestamp_col, - update_entities_with_inferred_types_from_feature_views, - update_feature_views_with_inferred_features, + update_feature_views_with_inferred_features_and_entities, ) from feast.infra.offline_stores.contrib.spark_offline_store.spark_source import ( SparkSource, ) from feast.on_demand_feature_view import on_demand_feature_view -from feast.types import Float32, String, UnixTimestamp +from feast.types import Float32, Float64, Int64, String, UnixTimestamp from tests.utils.data_source_utils import ( prep_file_source, simple_bq_source_using_query_arg, @@ -39,47 +38,6 @@ ) -def test_update_entities_with_inferred_types_from_feature_views( - simple_dataset_1, simple_dataset_2 -): - with prep_file_source( - df=simple_dataset_1, timestamp_field="ts_1" - ) as file_source, prep_file_source( - df=simple_dataset_2, timestamp_field="ts_1" - ) as file_source_2: - - fv1 = FeatureView( - name="fv1", entities=["id"], batch_source=file_source, ttl=None, - ) - fv2 = FeatureView( - name="fv2", entities=["id"], batch_source=file_source_2, ttl=None, - ) - - actual_1 = Entity(name="id", join_keys=["id_join_key"]) - actual_2 = Entity(name="id", join_keys=["id_join_key"]) - - update_entities_with_inferred_types_from_feature_views( - [actual_1], [fv1], RepoConfig(provider="local", project="test") - ) - update_entities_with_inferred_types_from_feature_views( - [actual_2], [fv2], RepoConfig(provider="local", project="test") - ) - assert actual_1 == Entity( - name="id", join_keys=["id_join_key"], value_type=ValueType.INT64 - ) - assert actual_2 == Entity( - name="id", join_keys=["id_join_key"], value_type=ValueType.STRING - ) - - with pytest.raises(RegistryInferenceFailure): - # two viable data types - update_entities_with_inferred_types_from_feature_views( - [Entity(name="id", join_keys=["id_join_key"])], - [fv1, fv2], - RepoConfig(provider="local", project="test"), - ) - - def test_infer_datasource_names_file(): file_path = "path/to/test.csv" data_source = FileSource(path=file_path) @@ -287,7 +245,10 @@ def test_view_with_missing_feature(features_df: pd.DataFrame) -> pd.DataFrame: test_view_with_missing_feature.infer_features() -def test_update_feature_views_with_inferred_features(): +def test_feature_view_inference_respects_basic_inference(): + """ + Tests that feature view inference respects the basic inference that occurs during creation. + """ file_source = FileSource(name="test", path="test path") entity1 = Entity(name="test1", join_keys=["test_column_1"]) entity2 = Entity(name="test2", join_keys=["test_column_2"]) @@ -312,26 +273,132 @@ def test_update_feature_views_with_inferred_features(): ) assert len(feature_view_1.schema) == 2 - assert len(feature_view_1.features) == 2 + assert len(feature_view_1.features) == 1 + assert len(feature_view_1.entity_columns) == 1 - # The entity field should be deleted from the schema and features of the feature view. - update_feature_views_with_inferred_features( + update_feature_views_with_inferred_features_and_entities( [feature_view_1], [entity1], RepoConfig(provider="local", project="test") ) - assert len(feature_view_1.schema) == 1 + assert len(feature_view_1.schema) == 2 assert len(feature_view_1.features) == 1 + assert len(feature_view_1.entity_columns) == 1 assert len(feature_view_2.schema) == 3 - assert len(feature_view_2.features) == 3 + assert len(feature_view_2.features) == 1 + assert len(feature_view_2.entity_columns) == 2 - # The entity fields should be deleted from the schema and features of the feature view. - update_feature_views_with_inferred_features( + update_feature_views_with_inferred_features_and_entities( [feature_view_2], [entity1, entity2], RepoConfig(provider="local", project="test"), ) - assert len(feature_view_2.schema) == 1 + assert len(feature_view_2.schema) == 3 assert len(feature_view_2.features) == 1 + assert len(feature_view_2.entity_columns) == 2 + + +def test_feature_view_inference_on_entity_columns(simple_dataset_1): + """ + Tests that feature view inference correctly infers entity columns. + """ + with prep_file_source(df=simple_dataset_1, timestamp_field="ts_1") as file_source: + entity1 = Entity(name="test1", join_keys=["id_join_key"]) + feature_view_1 = FeatureView( + name="test1", + entities=[entity1], + schema=[Field(name="int64_col", dtype=Int64)], + source=file_source, + ) + + assert len(feature_view_1.schema) == 1 + assert len(feature_view_1.features) == 1 + assert len(feature_view_1.entity_columns) == 0 + + update_feature_views_with_inferred_features_and_entities( + [feature_view_1], [entity1], RepoConfig(provider="local", project="test") + ) + + # The schema is only used as a parameter, as is therefore not updated during inference. + assert len(feature_view_1.schema) == 1 + + # Since there is already a feature specified, additional features are not inferred. + assert len(feature_view_1.features) == 1 + + # The single entity column is inferred correctly. + assert len(feature_view_1.entity_columns) == 1 + + +def test_feature_view_inference_respects_entity_value_type(simple_dataset_1): + """ + Tests that feature view inference still respects an entity's value type. + """ + # TODO(felixwang9817): Remove this test once entity value_type is removed. + with prep_file_source(df=simple_dataset_1, timestamp_field="ts_1") as file_source: + entity1 = Entity( + name="test1", join_keys=["id_join_key"], value_type=ValueType.STRING + ) + feature_view_1 = FeatureView( + name="test1", + entities=[entity1], + schema=[Field(name="int64_col", dtype=Int64)], + source=file_source, + ) + + assert len(feature_view_1.schema) == 1 + assert len(feature_view_1.features) == 1 + assert len(feature_view_1.entity_columns) == 0 + + update_feature_views_with_inferred_features_and_entities( + [feature_view_1], [entity1], RepoConfig(provider="local", project="test") + ) + + # The schema is only used as a parameter, as is therefore not updated during inference. + assert len(feature_view_1.schema) == 1 + + # Since there is already a feature specified, additional features are not inferred. + assert len(feature_view_1.features) == 1 + + # The single entity column is inferred correctly and has type String. + assert len(feature_view_1.entity_columns) == 1 + assert feature_view_1.entity_columns[0].dtype == String + + +def test_feature_view_inference_on_feature_columns(simple_dataset_1): + """ + Tests that feature view inference correctly infers feature columns. + """ + with prep_file_source(df=simple_dataset_1, timestamp_field="ts_1") as file_source: + entity1 = Entity(name="test1", join_keys=["id_join_key"]) + feature_view_1 = FeatureView( + name="test1", + entities=[entity1], + schema=[Field(name="id_join_key", dtype=Int64)], + source=file_source, + ) + + assert len(feature_view_1.schema) == 1 + assert len(feature_view_1.features) == 0 + assert len(feature_view_1.entity_columns) == 1 + + update_feature_views_with_inferred_features_and_entities( + [feature_view_1], [entity1], RepoConfig(provider="local", project="test") + ) + + # The schema is only used as a parameter, as is therefore not updated during inference. + assert len(feature_view_1.schema) == 1 + + # All three feature columns are inferred correctly. + assert len(feature_view_1.features) == 3 + print(feature_view_1.features) + feature_column_1 = Field(name="float_col", dtype=Float64) + feature_column_2 = Field(name="int64_col", dtype=Int64) + feature_column_3 = Field(name="string_col", dtype=String) + assert feature_column_1 in feature_view_1.features + assert feature_column_2 in feature_view_1.features + assert feature_column_3 in feature_view_1.features + + # The single entity column remains. + assert len(feature_view_1.entity_columns) == 1 def test_update_feature_services_with_inferred_features(simple_dataset_1): @@ -344,13 +411,16 @@ def test_update_feature_services_with_inferred_features(simple_dataset_1): assert len(feature_service.feature_view_projections) == 1 assert len(feature_service.feature_view_projections[0].features) == 0 - update_feature_views_with_inferred_features( + update_feature_views_with_inferred_features_and_entities( [feature_view_1], [entity1], RepoConfig(provider="local", project="test") ) feature_service.infer_features( fvs_to_update={feature_view_1.name: feature_view_1} ) - assert len(feature_view_1.schema) == 3 + assert len(feature_view_1.schema) == 0 assert len(feature_view_1.features) == 3 assert len(feature_service.feature_view_projections[0].features) == 3 + + +# TODO(felixwang9817): Add tests that interact with field mapping. diff --git a/sdk/python/tests/integration/registration/test_registry.py b/sdk/python/tests/integration/registration/test_registry.py index 46e9a19544..bb02f9a9e3 100644 --- a/sdk/python/tests/integration/registration/test_registry.py +++ b/sdk/python/tests/integration/registration/test_registry.py @@ -26,7 +26,6 @@ from feast.feature_view import FeatureView from feast.field import Field from feast.on_demand_feature_view import RequestSource, on_demand_feature_view -from feast.protos.feast.types import Value_pb2 as ValueProto from feast.registry import Registry from feast.repo_config import RegistryConfig from feast.types import Array, Bytes, Float32, Int32, Int64, String @@ -73,10 +72,7 @@ def s3_registry() -> Registry: ) def test_apply_entity_success(test_registry): entity = Entity( - name="driver_car_id", - description="Car driver id", - value_type=ValueType.STRING, - tags={"team": "matchmaking"}, + name="driver_car_id", description="Car driver id", tags={"team": "matchmaking"}, ) project = "project" @@ -90,7 +86,6 @@ def test_apply_entity_success(test_registry): assert ( len(entities) == 1 and entity.name == "driver_car_id" - and entity.value_type == ValueType(ValueProto.ValueType.STRING) and entity.description == "Car driver id" and "team" in entity.tags and entity.tags["team"] == "matchmaking" @@ -99,7 +94,6 @@ def test_apply_entity_success(test_registry): entity = test_registry.get_entity("driver_car_id", project) assert ( entity.name == "driver_car_id" - and entity.value_type == ValueType(ValueProto.ValueType.STRING) and entity.description == "Car driver id" and "team" in entity.tags and entity.tags["team"] == "matchmaking" @@ -122,10 +116,7 @@ def test_apply_entity_success(test_registry): ) def test_apply_entity_integration(test_registry): entity = Entity( - name="driver_car_id", - description="Car driver id", - value_type=ValueType.STRING, - tags={"team": "matchmaking"}, + name="driver_car_id", description="Car driver id", tags={"team": "matchmaking"}, ) project = "project" @@ -139,7 +130,6 @@ def test_apply_entity_integration(test_registry): assert ( len(entities) == 1 and entity.name == "driver_car_id" - and entity.value_type == ValueType(ValueProto.ValueType.STRING) and entity.description == "Car driver id" and "team" in entity.tags and entity.tags["team"] == "matchmaking" @@ -148,7 +138,6 @@ def test_apply_entity_integration(test_registry): entity = test_registry.get_entity("driver_car_id", project) assert ( entity.name == "driver_car_id" - and entity.value_type == ValueType(ValueProto.ValueType.STRING) and entity.description == "Car driver id" and "team" in entity.tags and entity.tags["team"] == "matchmaking" @@ -173,6 +162,8 @@ def test_apply_feature_view_success(test_registry): created_timestamp_column="timestamp", ) + entity = Entity(name="fs1_my_entity_1", join_keys=["test"]) + fv1 = FeatureView( name="my_feature_view_1", schema=[ @@ -181,7 +172,7 @@ def test_apply_feature_view_success(test_registry): Field(name="fs1_my_feature_3", dtype=Array(String)), Field(name="fs1_my_feature_4", dtype=Array(Bytes)), ], - entities=["fs1_my_entity_1"], + entities=[entity], tags={"team": "matchmaking"}, batch_source=batch_source, ttl=timedelta(minutes=5), @@ -327,10 +318,12 @@ def test_modify_feature_views_success(test_registry, request_source_schema): request_source = RequestSource(name="request_source", schema=request_source_schema,) + entity = Entity(name="fs1_my_entity_1", join_keys=["test"]) + fv1 = FeatureView( name="my_feature_view_1", schema=[Field(name="fs1_my_feature_1", dtype=Int64)], - entities=["fs1_my_entity_1"], + entities=[entity], tags={"team": "matchmaking"}, batch_source=batch_source, ttl=timedelta(minutes=5), @@ -443,6 +436,8 @@ def test_apply_feature_view_integration(test_registry): created_timestamp_column="timestamp", ) + entity = Entity(name="fs1_my_entity_1", join_keys=["test"]) + fv1 = FeatureView( name="my_feature_view_1", schema=[ @@ -451,7 +446,7 @@ def test_apply_feature_view_integration(test_registry): Field(name="fs1_my_feature_3", dtype=Array(String)), Field(name="fs1_my_feature_4", dtype=Array(Bytes)), ], - entities=["fs1_my_entity_1"], + entities=[entity], tags={"team": "matchmaking"}, batch_source=batch_source, ttl=timedelta(minutes=5), @@ -518,6 +513,8 @@ def test_apply_data_source(test_registry: Registry): created_timestamp_column="timestamp", ) + entity = Entity(name="fs1_my_entity_1", join_keys=["test"]) + fv1 = FeatureView( name="my_feature_view_1", schema=[ @@ -526,7 +523,7 @@ def test_apply_data_source(test_registry: Registry): Field(name="fs1_my_feature_3", dtype=Array(String)), Field(name="fs1_my_feature_4", dtype=Array(Bytes)), ], - entities=["fs1_my_entity_1"], + entities=[entity], tags={"team": "matchmaking"}, batch_source=batch_source, ttl=timedelta(minutes=5), @@ -573,10 +570,7 @@ def test_commit(): test_registry = Registry(registry_config, None) entity = Entity( - name="driver_car_id", - description="Car driver id", - value_type=ValueType.STRING, - tags={"team": "matchmaking"}, + name="driver_car_id", description="Car driver id", tags={"team": "matchmaking"}, ) project = "project" @@ -591,7 +585,6 @@ def test_commit(): assert ( len(entities) == 1 and entity.name == "driver_car_id" - and entity.value_type == ValueType(ValueProto.ValueType.STRING) and entity.description == "Car driver id" and "team" in entity.tags and entity.tags["team"] == "matchmaking" @@ -600,7 +593,6 @@ def test_commit(): entity = test_registry.get_entity("driver_car_id", project, allow_cache=True) assert ( entity.name == "driver_car_id" - and entity.value_type == ValueType(ValueProto.ValueType.STRING) and entity.description == "Car driver id" and "team" in entity.tags and entity.tags["team"] == "matchmaking" @@ -626,7 +618,6 @@ def test_commit(): assert ( len(entities) == 1 and entity.name == "driver_car_id" - and entity.value_type == ValueType(ValueProto.ValueType.STRING) and entity.description == "Car driver id" and "team" in entity.tags and entity.tags["team"] == "matchmaking" @@ -635,7 +626,6 @@ def test_commit(): entity = test_registry.get_entity("driver_car_id", project) assert ( entity.name == "driver_car_id" - and entity.value_type == ValueType(ValueProto.ValueType.STRING) and entity.description == "Car driver id" and "team" in entity.tags and entity.tags["team"] == "matchmaking" diff --git a/sdk/python/tests/integration/registration/test_universal_types.py b/sdk/python/tests/integration/registration/test_universal_types.py index 6d016e3e85..b03303f6ee 100644 --- a/sdk/python/tests/integration/registration/test_universal_types.py +++ b/sdk/python/tests/integration/registration/test_universal_types.py @@ -8,10 +8,17 @@ import pyarrow as pa import pytest -from feast.entity import Entity from feast.infra.offline_stores.offline_store import RetrievalJob -from feast.types import Array, Bool, Float32, Int32, Int64, UnixTimestamp -from feast.value_type import ValueType +from feast.types import ( + Array, + Bool, + Float32, + Float64, + Int32, + Int64, + String, + UnixTimestamp, +) from tests.data.data_creator import create_dataset from tests.integration.feature_repos.universal.entities import driver from tests.integration.feature_repos.universal.feature_views import driver_feature_view @@ -89,10 +96,7 @@ def get_fixtures(request, environment): ).lower() config = request.param df = create_dataset( - ValueType.INT64, - config.feature_dtype, - config.feature_is_list, - config.has_empty_list, + Int64, config.feature_dtype, config.feature_is_list, config.has_empty_list, ) data_source = environment.data_source_creator.create_data_source( df, destination_name=destination_name, field_mapping={"ts_1": "ts"}, @@ -110,18 +114,11 @@ def get_fixtures(request, environment): @pytest.mark.integration @pytest.mark.universal_offline_stores -@pytest.mark.parametrize( - "entity_type", [ValueType.INT32, ValueType.INT64, ValueType.STRING] -) +@pytest.mark.parametrize("entity_type", [Int32, Int64, String]) def test_entity_inference_types_match(environment, entity_type): fs = environment.feature_store # Don't specify value type in entity to force inference - entity = Entity( - name=f"driver_{entity_type.name.lower()}", - value_type=ValueType.UNKNOWN, - join_key="driver_id", - ) df = create_dataset(entity_type, feature_dtype="int32",) data_source = environment.data_source_creator.create_data_source( df, @@ -134,20 +131,30 @@ def test_entity_inference_types_match(environment, entity_type): feature_is_list=False, has_empty_list=False, data_source=data_source, - entity=entity.name, + entity_type=entity_type, ) + + # TODO(felixwang9817): Refactor this by finding a better way to force type inference. + # Override the schema and entity_columns to force entity inference. + entity = driver() + fv.schema = list(filter(lambda x: x.name != entity.join_key, fv.schema)) + fv.entity_columns = [] fs.apply([fv, entity]) - inferred_entity = fs.get_entity(entity.name) entity_type_to_expected_inferred_entity_type = { - ValueType.INT32: {ValueType.INT32, ValueType.INT64}, - ValueType.INT64: {ValueType.INT32, ValueType.INT64}, - ValueType.FLOAT: {ValueType.DOUBLE}, - ValueType.STRING: {ValueType.STRING}, + Int32: {Int32, Int64}, + Int64: {Int32, Int64}, + Float32: {Float64}, + String: {String}, } + + entity_columns = list( + filter(lambda x: x.name == entity.join_key, fv.entity_columns) + ) + assert len(entity_columns) == 1 + entity_column = entity_columns[0] assert ( - inferred_entity.value_type - in entity_type_to_expected_inferred_entity_type[entity_type] + entity_column.dtype in entity_type_to_expected_inferred_entity_type[entity_type] ) @@ -263,7 +270,12 @@ def test_feature_get_online_features_types_match( def create_feature_view( - name, feature_dtype, feature_is_list, has_empty_list, data_source, entity="driver" + name, + feature_dtype, + feature_is_list, + has_empty_list, + data_source, + entity_type=Int64, ): if feature_is_list is True: if feature_dtype == "int32": @@ -288,7 +300,9 @@ def create_feature_view( elif feature_dtype == "datetime": dtype = UnixTimestamp - return driver_feature_view(data_source, name=name, dtype=dtype, entities=[entity]) + return driver_feature_view( + data_source, name=name, dtype=dtype, entity_type=entity_type + ) def assert_expected_historical_feature_types( diff --git a/sdk/python/tests/integration/scaffolding/test_partial_apply.py b/sdk/python/tests/integration/scaffolding/test_partial_apply.py index 3ab9bf196f..e5a7206b96 100644 --- a/sdk/python/tests/integration/scaffolding/test_partial_apply.py +++ b/sdk/python/tests/integration/scaffolding/test_partial_apply.py @@ -2,7 +2,7 @@ import pytest -from feast import BigQuerySource, FeatureView, Field +from feast import BigQuerySource, Entity, FeatureView, Field from feast.types import Float32, String from tests.utils.cli_utils import CliRunner, get_example_repo from tests.utils.online_read_write_test import basic_rw_test @@ -19,6 +19,7 @@ def test_partial() -> None: with runner.local_repo( get_example_repo("example_feature_repo_1.py"), "bigquery" ) as store: + driver = Entity(name="driver", join_keys=["test"]) driver_locations_source = BigQuerySource( table="feast-oss.public.drivers", @@ -28,12 +29,13 @@ def test_partial() -> None: driver_locations_100 = FeatureView( name="driver_locations_100", - entities=["driver"], + entities=[driver], ttl=timedelta(days=1), schema=[ Field(name="lat", dtype=Float32), Field(name="lon", dtype=String), Field(name="name", dtype=String), + Field(name="test", dtype=String), ], online=True, batch_source=driver_locations_source, diff --git a/sdk/python/tests/unit/diff/test_registry_diff.py b/sdk/python/tests/unit/diff/test_registry_diff.py index 483dae73e2..ae10c834c8 100644 --- a/sdk/python/tests/unit/diff/test_registry_diff.py +++ b/sdk/python/tests/unit/diff/test_registry_diff.py @@ -2,34 +2,36 @@ diff_registry_objects, tag_objects_for_keep_delete_update_add, ) +from feast.entity import Entity from feast.feature_view import FeatureView from tests.utils.data_source_utils import prep_file_source def test_tag_objects_for_keep_delete_update_add(simple_dataset_1): with prep_file_source(df=simple_dataset_1, timestamp_field="ts_1") as file_source: + entity = Entity(name="id", join_keys=["id"]) to_delete = FeatureView( - name="to_delete", entities=["id"], batch_source=file_source, ttl=None, + name="to_delete", entities=[entity], batch_source=file_source, ttl=None, ) unchanged_fv = FeatureView( - name="fv1", entities=["id"], batch_source=file_source, ttl=None, + name="fv1", entities=[entity], batch_source=file_source, ttl=None, ) pre_changed = FeatureView( name="fv2", - entities=["id"], + entities=[entity], batch_source=file_source, ttl=None, tags={"when": "before"}, ) post_changed = FeatureView( name="fv2", - entities=["id"], + entities=[entity], batch_source=file_source, ttl=None, tags={"when": "after"}, ) to_add = FeatureView( - name="to_add", entities=["id"], batch_source=file_source, ttl=None, + name="to_add", entities=[entity], batch_source=file_source, ttl=None, ) keep, delete, update, add = tag_objects_for_keep_delete_update_add( @@ -52,16 +54,17 @@ def test_tag_objects_for_keep_delete_update_add(simple_dataset_1): def test_diff_registry_objects_feature_views(simple_dataset_1): with prep_file_source(df=simple_dataset_1, timestamp_field="ts_1") as file_source: + entity = Entity(name="id", join_keys=["id"]) pre_changed = FeatureView( name="fv2", - entities=["id"], + entities=[entity], batch_source=file_source, ttl=None, tags={"when": "before"}, ) post_changed = FeatureView( name="fv2", - entities=["id"], + entities=[entity], batch_source=file_source, ttl=None, tags={"when": "after"}, diff --git a/sdk/python/tests/unit/infra/test_provider.py b/sdk/python/tests/unit/infra/test_provider.py index 43c09760e9..5ed5603b03 100644 --- a/sdk/python/tests/unit/infra/test_provider.py +++ b/sdk/python/tests/unit/infra/test_provider.py @@ -20,14 +20,13 @@ from feast.field import Field from feast.infra.provider import _get_column_names from feast.types import String -from feast.value_type import ValueType def test_get_column_names_preserves_feature_ordering(): - entity = Entity("my-entity", description="My entity", value_type=ValueType.STRING) + entity = Entity("my-entity", description="My entity") fv = FeatureView( name="my-fv", - entities=["my-entity"], + entities=[entity], ttl=timedelta(days=1), batch_source=BigQuerySource(table="non-existent-mock"), schema=[ diff --git a/sdk/python/tests/unit/test_entity.py b/sdk/python/tests/unit/test_entity.py index 254a975f67..04a857ddef 100644 --- a/sdk/python/tests/unit/test_entity.py +++ b/sdk/python/tests/unit/test_entity.py @@ -20,19 +20,14 @@ def test_join_key_default(): with pytest.deprecated_call(): - entity = Entity( - "my-entity", description="My entity", value_type=ValueType.STRING - ) + entity = Entity("my-entity", description="My entity") assert entity.join_key == "my-entity" def test_entity_class_contains_tags(): with pytest.deprecated_call(): entity = Entity( - "my-entity", - description="My entity", - value_type=ValueType.STRING, - tags={"key1": "val1", "key2": "val2"}, + "my-entity", description="My entity", tags={"key1": "val1", "key2": "val2"}, ) assert "key1" in entity.tags.keys() and entity.tags["key1"] == "val1" assert "key2" in entity.tags.keys() and entity.tags["key2"] == "val2" @@ -40,20 +35,18 @@ def test_entity_class_contains_tags(): def test_entity_without_tags_empty_dict(): with pytest.deprecated_call(): - entity = Entity( - "my-entity", description="My entity", value_type=ValueType.STRING - ) + entity = Entity("my-entity", description="My entity") assert entity.tags == dict() assert len(entity.tags) == 0 def test_entity_without_description(): with pytest.deprecated_call(): - Entity("my-entity", value_type=ValueType.STRING) + Entity("my-entity") def test_name_not_specified(): - assertpy.assert_that(lambda: Entity(value_type=ValueType.STRING)).raises(ValueError) + assertpy.assert_that(lambda: Entity()).raises(ValueError) def test_multiple_args(): @@ -61,15 +54,19 @@ def test_multiple_args(): def test_name_keyword(recwarn): - Entity(name="my-entity", value_type=ValueType.STRING) + Entity(name="my-entity") assert len(recwarn) == 0 + Entity(name="my-entity", join_key="test") + assert len(recwarn) == 1 + Entity(name="my-entity", join_keys=["test"]) + assert len(recwarn) == 1 def test_hash(): - entity1 = Entity(name="my-entity", value_type=ValueType.STRING) - entity2 = Entity(name="my-entity", value_type=ValueType.STRING) - entity3 = Entity(name="my-entity", value_type=ValueType.FLOAT) - entity4 = Entity(name="my-entity", value_type=ValueType.FLOAT, description="test") + entity1 = Entity(name="my-entity") + entity2 = Entity(name="my-entity") + entity3 = Entity(name="my-entity", join_keys=["not-my-entity"]) + entity4 = Entity(name="my-entity", join_keys=["not-my-entity"], description="test") s1 = {entity1, entity2} assert len(s1) == 1 diff --git a/sdk/python/tests/unit/test_feature_view.py b/sdk/python/tests/unit/test_feature_view.py index 80a583806e..1ef36081ec 100644 --- a/sdk/python/tests/unit/test_feature_view.py +++ b/sdk/python/tests/unit/test_feature_view.py @@ -62,3 +62,7 @@ def test_hash(): s4 = {feature_view_1, feature_view_2, feature_view_3, feature_view_4} assert len(s4) == 3 + + +# TODO(felixwang9817): Add tests for proto conversion. +# TODO(felixwang9817): Add tests for field mapping logic. diff --git a/sdk/python/tests/unit/test_unit_feature_store.py b/sdk/python/tests/unit/test_unit_feature_store.py index 6f9dd6acb0..0c13dffa62 100644 --- a/sdk/python/tests/unit/test_unit_feature_store.py +++ b/sdk/python/tests/unit/test_unit_feature_store.py @@ -17,7 +17,7 @@ class MockFeatureView: projection: MockFeatureViewProjection -def test__get_unique_entities(): +def test_get_unique_entities(): entity_values = { "entity_1": [Value(int64_val=1), Value(int64_val=2), Value(int64_val=1)], "entity_2": [ diff --git a/sdk/python/tests/utils/online_write_benchmark.py b/sdk/python/tests/utils/online_write_benchmark.py index 82ffc8e98b..9f2f8ba60d 100644 --- a/sdk/python/tests/utils/online_write_benchmark.py +++ b/sdk/python/tests/utils/online_write_benchmark.py @@ -17,13 +17,13 @@ from feast.infra.provider import _convert_arrow_to_proto from feast.repo_config import RepoConfig from feast.types import Float32, Int32 -from feast.value_type import ValueType def create_driver_hourly_stats_feature_view(source): + driver = Entity(name="driver", join_keys=["driver_id"]) driver_stats_feature_view = FeatureView( name="driver_stats", - entities=["driver_id"], + entities=[driver], schema=[ Field(name="conv_rate", dtype=Float32), Field(name="acc_rate", dtype=Float32), @@ -61,7 +61,7 @@ def benchmark_writes(): # This is just to set data source to something, we're not reading from parquet source here. parquet_path = os.path.join(temp_dir, "data.parquet") - driver = Entity(name="driver_id", value_type=ValueType.INT64) + driver = Entity(name="driver_id") table = create_driver_hourly_stats_feature_view( create_driver_hourly_stats_source(parquet_path=parquet_path) )