Skip to content
Next Next commit
Fix
Signed-off-by: Kevin Zhang <kzhang@tecton.ai>
  • Loading branch information
kevjumba committed Apr 13, 2022
commit 1802ac48166a4af795e6631d64f07b1504dcf592
8 changes: 4 additions & 4 deletions docs/getting-started/concepts/feature-view.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,10 @@ from feast import Field, Float64, RequestSource
# available at request time (e.g. part of the user initiated HTTP request)
input_request = RequestSource(
name="vals_to_add",
schema={
"val_to_add": ValueType.INT64,
"val_to_add_2": ValueType.INT64
}
schema=[
Field(name="val_to_add", dtype=PrimitiveFeastType.INT64),
Field(name="val_to_add_2": dtype=PrimitiveFeastType.INT64),
]
)

# Use the input data and feature view features to create new features
Expand Down
2 changes: 1 addition & 1 deletion docs/how-to-guides/adding-a-new-offline-store.md
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ Finally, the custom data source class can be use in the feature repo to define a
```python
pdriver_hourly_stats = CustomFileDataSource(
path="feature_repo/data/driver_stats.parquet",
event_timestamp_column="event_timestamp",
timestamp_field="event_timestamp",
created_timestamp_column="created",
)

Expand Down
12 changes: 10 additions & 2 deletions examples/java-demo/feature_repo/driver_repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
from feast.request_feature_view import RequestFeatureView
from feast.types import Float32, Float64, Int64, String
from google.protobuf.duration_pb2 import Duration
from feast.field import Field
from feast.types import PrimitiveFeastType

from feast import Entity, Feature, FeatureView, FileSource, ValueType

Expand Down Expand Up @@ -33,7 +35,10 @@
# available at request time (e.g. part of the user initiated HTTP request)
input_request = RequestSource(
name="vals_to_add",
schema={"val_to_add": ValueType.INT64, "val_to_add_2": ValueType.INT64},
schema=[
Field(name="val_to_add", dtype=PrimitiveFeastType.INT64),
Field(name="val_to_add_2", dtype=PrimitiveFeastType.INT64),
],
)

# Define an on demand feature view which can generate new features based on
Expand All @@ -59,6 +64,9 @@ def transformed_conv_rate(inputs: pd.DataFrame) -> pd.DataFrame:
driver_age_request_fv = RequestFeatureView(
name="driver_age",
request_data_source=RequestSource(
name="driver_age", schema={"driver_age": ValueType.INT64,}
name="driver_age",
schema=[
Field(name="driver_age", dtype=PrimitiveFeastType.INT64),
],
),
)
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@
from feast.feature_view import FeatureView
from feast.field import Field
from feast.on_demand_feature_view import on_demand_feature_view
from feast.types import Float32, Float64, Int64
from feast.types import Float32, Float64, Int64, PrimitiveFeastType
from feast.value_type import ValueType
from google.protobuf.duration_pb2 import Duration
from feast import FileSource

from feast import FileSource

Expand Down Expand Up @@ -42,7 +43,10 @@

input_request = RequestSource(
name="vals_to_add",
schema={"val_to_add": ValueType.INT64, "val_to_add_2": ValueType.INT64},
schema=[
Field(name="val_to_add", dtype=PrimitiveFeastType.INT64),
Field(name="val_to_add_2", dtype=PrimitiveFeastType.INT64),
],
)


Expand Down
6 changes: 5 additions & 1 deletion protos/feast/core/DataSource.proto
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ option java_package = "feast.proto.core";

import "feast/core/DataFormat.proto";
import "feast/types/Value.proto";
import "feast/core/Feature.proto";

// Defines a Data Source that can be used source Feature data
// Next available id: 28
Expand Down Expand Up @@ -212,7 +213,10 @@ message DataSource {
message RequestDataOptions {
reserved 1;
// Mapping of feature name to type
map<string, feast.types.ValueType.Enum> schema = 2;
map<string, feast.types.ValueType.Enum> deprecated_schema = 2;

repeated FeatureSpecV2 schema = 3;

}

// Defines options for DataSource that supports pushing data to it. This allows data to be pushed to
Expand Down
94 changes: 82 additions & 12 deletions sdk/python/feast/data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@
# limitations under the License.

import enum
import re
import warnings
from abc import ABC, abstractmethod
from typing import Any, Callable, Dict, Iterable, Optional, Tuple
from typing import Any, Callable, Dict, Iterable, Optional, Tuple, Union, List

from google.protobuf.json_format import MessageToJson

Expand All @@ -24,6 +25,9 @@
from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto
from feast.repo_config import RepoConfig, get_data_source_class_from_type
from feast.value_type import ValueType
from numpy import deprecate
from feast.field import Field
from feast.types import VALUE_TYPES_TO_FEAST_TYPES


class SourceType(enum.Enum):
Expand Down Expand Up @@ -449,26 +453,32 @@ class RequestSource(DataSource):

Args:
name: Name of the request data source
schema: Schema mapping from the input feature name to a ValueType
schema Union[Dict[str, ValueType], List[Field]]: Schema mapping from the input feature name to a ValueType
description (optional): A human-readable description.
tags (optional): A dictionary of key-value pairs to store arbitrary metadata.
owner (optional): The owner of the request data source, typically the email of the primary
maintainer.
"""

name: str
schema: Dict[str, ValueType]
schema: Union[Dict[str, ValueType], List[Field]]
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe the internals should always be using List[Field], and we convert to that in __init__ and from_proto? Or is this field exposed to users directly, and it would break them if we changed it's type?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually looking at the from_proto method we're changing the type of this field anyway

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, removing union.


def __init__(
self,
name: str,
schema: Dict[str, ValueType],
schema: Union[Dict[str, ValueType], List[Field]],
description: Optional[str] = "",
tags: Optional[Dict[str, str]] = None,
owner: Optional[str] = "",
):
"""Creates a RequestSource object."""
super().__init__(name=name, description=description, tags=tags, owner=owner)
if isinstance(schema, Dict):
warnings.warn(
"Schema in RequestSource is changing type. The schema data type Dict[str, ValueType] is being deprecated in Feast 0.23. "
"Please use List[Field] instead for the schema",
DeprecationWarning,
)
self.schema = schema

def validate(self, config: RepoConfig):
Expand All @@ -479,12 +489,67 @@ def get_table_column_names_and_types(
) -> Iterable[Tuple[str, str]]:
pass

def __eq__(self, other):
if not isinstance(other, RequestSource):
raise TypeError(
"Comparisons should only involve RequestSource class objects."
)
if (
self.name != other.name or
self.description != other.description or
self.owner != other.owner or
self.tags != other.tags
):
return False
else:
if isinstance(self.schema, List) and isinstance(other.schema, List):
for field1, field2 in zip(self.schema, other.schema):
if field1 != field2:
return False
return True
elif isinstance(self.schema, Dict) and isinstance(other.schema, Dict):
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should check here that set(self.schema.keys()).symmetric_difference(set(other.schema.keys()) is empty; otherwise this check will incorrectly pass if other.schema has keys that self.schema does not.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated, by just setting schema to list instead.

for key, value in self.schema.items():
if key not in other.schema:
return False
elif value != other.schema[key]:
return False
return True
elif isinstance(self.schema, Dict) and isinstance(other.schema, List):
dict_schema = self.schema
list_schema = other.schema
elif isinstance(self.schema, List) and isinstance(other.schema, Dict):
dict_schema = other.schema
list_schema = self.schema

temp_schema = {}
for field in list_schema:
temp_schema[field.name] = field.dtype
for name, value in dict_schema.items():
if name not in temp_schema:
return False
elif VALUE_TYPES_TO_FEAST_TYPES[value.value] != temp_schema[name]:
return False
return True

@staticmethod
def from_proto(data_source: DataSourceProto):

deprecated_schema = data_source.request_data_options.deprecated_schema
schema_pb = data_source.request_data_options.schema
schema = {}
for key, val in schema_pb.items():
schema[key] = ValueType(val)

schema = []
if deprecated_schema and not schema_pb:
warnings.warn(
"Schema in RequestSource is changing type. The schema data type Dict[str, ValueType] is being deprecated in Feast 0.23. "
"Please use List[Field] instead for the schema",
DeprecationWarning,
)
for key, val in deprecated_schema.items():
schema[key] = ValueType(val)
else:
for field_proto in schema_pb:
schema.append(Field.from_proto(field_proto))

return RequestSource(
name=data_source.name,
schema=schema,
Expand All @@ -494,18 +559,23 @@ def from_proto(data_source: DataSourceProto):
)

def to_proto(self) -> DataSourceProto:
schema_pb = {}
for key, value in self.schema.items():
schema_pb[key] = value.value
options = DataSourceProto.RequestDataOptions(schema=schema_pb)

schema_pb = []

if isinstance(self.schema, Dict):
for key, value in self.schema.items():
schema_pb.append(Field(name=key, dtype=VALUE_TYPES_TO_FEAST_TYPES[value.value]).to_proto())
else:
for field in self.schema:
schema_pb.append(field.to_proto())
data_source_proto = DataSourceProto(
name=self.name,
type=DataSourceProto.REQUEST_SOURCE,
request_data_options=options,
description=self.description,
tags=self.tags,
owner=self.owner,
)
data_source_proto.request_data_options.schema.extend(schema_pb)

return data_source_proto

Expand Down
3 changes: 3 additions & 0 deletions sdk/python/feast/field.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from feast.protos.feast.types.Value_pb2 import ValueType
from feast.feature import Feature
from feast.protos.feast.core.Feature_pb2 import FeatureSpecV2 as FieldProto
from feast.types import FeastType, from_value_type
Expand Down Expand Up @@ -62,6 +63,8 @@ def __str__(self):

def to_proto(self) -> FieldProto:
"""Converts a Field object to its protobuf representation."""
if isinstance(self.dtype, ValueType):
return self.dtype.Enum
value_type = self.dtype.to_value_type()
return FieldProto(name=self.name, value_type=value_type.value)

Expand Down
6 changes: 6 additions & 0 deletions sdk/python/feast/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,12 @@ def to_value_type(self) -> ValueType:
def __str__(self):
return PRIMITIVE_FEAST_TYPES_TO_STRING[self.name]

def __eq__(self, other):
return self.value == other.value

def __hash__(self):
return hash((PRIMITIVE_FEAST_TYPES_TO_STRING[self.name]))


Invalid = PrimitiveFeastType.INVALID
Bytes = PrimitiveFeastType.BYTES
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
ValueType,
)
from feast.data_source import DataSource, RequestSource
from feast.types import FeastType
from feast.types import FeastType, PrimitiveFeastType
from tests.integration.feature_repos.universal.entities import location

from feast.field import Field

def driver_feature_view(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was this not being used anywhere?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think my rebase must have accidentally deleted it, i added it back.

data_source: DataSource,
Expand Down Expand Up @@ -123,7 +123,11 @@ def similarity_feature_view(


def create_conv_rate_request_source():
return RequestSource(name="conv_rate_input", schema={"val_to_add": ValueType.INT32})
return RequestSource(
name="conv_rate_input",
schema=[
Field(name="val_to_add", dtype=PrimitiveFeastType.INT32),
])


def create_similarity_request_source():
Expand Down
13 changes: 11 additions & 2 deletions sdk/python/tests/integration/registration/test_inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@
simple_bq_source_using_table_arg,
)

from feast.field import Field
from feast.types import PrimitiveFeastType


def test_update_entities_with_inferred_types_from_feature_views(
simple_dataset_1, simple_dataset_2
Expand Down Expand Up @@ -168,7 +171,10 @@ def test_update_data_sources_with_inferred_event_timestamp_col(universal_data_so
def test_on_demand_features_type_inference():
# Create Feature Views
date_request = RequestSource(
name="date_request", schema={"some_date": ValueType.UNIX_TIMESTAMP}
name="date_request",
schema=[
Field(name="some_date", dtype=PrimitiveFeastType.UNIX_TIMESTAMP),
],
)

@on_demand_feature_view(
Expand Down Expand Up @@ -227,7 +233,10 @@ def test_view_with_missing_feature(features_df: pd.DataFrame) -> pd.DataFrame:
def test_datasource_inference():
# Create Feature Views
date_request = RequestSource(
name="date_request", schema={"some_date": ValueType.UNIX_TIMESTAMP}
name="date_request",
schema=[
Field(name="some_date", dtype=PrimitiveFeastType.UNIX_TIMESTAMP),
],
)

@on_demand_feature_view(
Expand Down
8 changes: 6 additions & 2 deletions sdk/python/tests/integration/registration/test_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@
from feast.repo_config import RegistryConfig
from feast.types import Array, Bytes, Float32, Int32, Int64, String
from feast.value_type import ValueType

from feast.field import Field
from feast.types import PrimitiveFeastType

@pytest.fixture
def local_registry() -> Registry:
Expand Down Expand Up @@ -247,7 +248,10 @@ def test_modify_feature_views_success(test_registry):
)

request_source = RequestSource(
name="request_source", schema={"my_input_1": ValueType.INT32}
name="request_source",
schema=[
Field(name="my_input_1", dtype=PrimitiveFeastType.INT32),
],
)

fv1 = FeatureView(
Expand Down
Loading