Skip to content
Prev Previous commit
Next Next commit
Fix
Signed-off-by: Kevin Zhang <kzhang@tecton.ai>
  • Loading branch information
kevjumba committed Apr 13, 2022
commit 5fd414dc76092956196d5a1c744072b1cd0a51fa
54 changes: 33 additions & 21 deletions sdk/python/feast/data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,18 @@
import re
import warnings
from abc import ABC, abstractmethod
from typing import Any, Callable, Dict, Iterable, Optional, Tuple, Union, List
from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, Union

from google.protobuf.json_format import MessageToJson
from numpy import deprecate

from feast import type_map
from feast.data_format import StreamFormat
from feast.field import Field
from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto
from feast.repo_config import RepoConfig, get_data_source_class_from_type
from feast.value_type import ValueType
from numpy import deprecate
from feast.field import Field
from feast.types import VALUE_TYPES_TO_FEAST_TYPES
from feast.value_type import ValueType


class SourceType(enum.Enum):
Expand Down Expand Up @@ -495,10 +495,10 @@ def __eq__(self, other):
"Comparisons should only involve RequestSource class objects."
)
if (
self.name != other.name or
self.description != other.description or
self.owner != other.owner or
self.tags != other.tags
self.name != other.name
or self.description != other.description
or self.owner != other.owner
or self.tags != other.tags
):
return False
else:
Expand All @@ -517,7 +517,7 @@ def __eq__(self, other):
elif isinstance(self.schema, Dict) and isinstance(other.schema, List):
dict_schema = self.schema
list_schema = other.schema
elif isinstance(self.schema, List) and isinstance(other.schema, Dict):
elif isinstance(self.schema, List) and isinstance(other.schema, Dict):
dict_schema = other.schema
list_schema = self.schema

Expand All @@ -537,34 +537,46 @@ def from_proto(data_source: DataSourceProto):
deprecated_schema = data_source.request_data_options.deprecated_schema
schema_pb = data_source.request_data_options.schema

schema = []
if deprecated_schema and not schema_pb:
warnings.warn(
"Schema in RequestSource is changing type. The schema data type Dict[str, ValueType] is being deprecated in Feast 0.23. "
"Please use List[Field] instead for the schema",
DeprecationWarning,
)
dict_schema = {}
for key, val in deprecated_schema.items():
schema[key] = ValueType(val)
dict_schema[key] = ValueType(val)
return RequestSource(
name=data_source.name,
schema=dict_schema,
description=data_source.description,
tags=dict(data_source.tags),
owner=data_source.owner,
)
else:
list_schema = []
for field_proto in schema_pb:
schema.append(Field.from_proto(field_proto))

return RequestSource(
name=data_source.name,
schema=schema,
description=data_source.description,
tags=dict(data_source.tags),
owner=data_source.owner,
)
list_schema.append(Field.from_proto(field_proto))

return RequestSource(
name=data_source.name,
schema=list_schema,
description=data_source.description,
tags=dict(data_source.tags),
owner=data_source.owner,
)

def to_proto(self) -> DataSourceProto:

schema_pb = []

if isinstance(self.schema, Dict):
for key, value in self.schema.items():
schema_pb.append(Field(name=key, dtype=VALUE_TYPES_TO_FEAST_TYPES[value.value]).to_proto())
schema_pb.append(
Field(
name=key, dtype=VALUE_TYPES_TO_FEAST_TYPES[value.value]
).to_proto()
)
else:
for field in self.schema:
schema_pb.append(field.to_proto())
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/feast/field.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from feast.protos.feast.types.Value_pb2 import ValueType
from feast.feature import Feature
from feast.protos.feast.core.Feature_pb2 import FeatureSpecV2 as FieldProto
from feast.protos.feast.types.Value_pb2 import ValueType
from feast.types import FeastType, from_value_type
from feast.value_type import ValueType

Expand Down
8 changes: 7 additions & 1 deletion sdk/python/feast/on_demand_feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,13 @@ def from_proto(cls, on_demand_feature_view_proto: OnDemandFeatureViewProto):
def get_request_data_schema(self) -> Dict[str, ValueType]:
schema: Dict[str, ValueType] = {}
for request_source in self.source_request_sources.values():
schema.update(request_source.schema)
if isinstance(request_source.schema, List):
new_schema = {}
for field in request_source.schema:
new_schema[field.name] = field.dtype.to_value_type()
schema.update(new_schema)
elif isinstance(request_source.schema, Dict):
Comment thread
kevjumba marked this conversation as resolved.
Outdated
schema.update(request_source.schema)
return schema

def get_transformed_features_df(
Expand Down
16 changes: 12 additions & 4 deletions sdk/python/feast/request_feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,20 @@ def __init__(
DeprecationWarning,
)

if isinstance(request_data_source.schema, Dict):
new_features = [
Field(name=name, dtype=dtype)
for name, dtype in request_data_source.schema.items()
]
else:
new_features = [
Field(name=field.name, dtype=field.dtype.to_value_type())
for field in request_data_source.schema
]

super().__init__(
name=name,
features=[
Field(name=name, dtype=from_value_type(value_type))
for name, value_type in request_data_source.schema.items()
],
features=new_features,
description=description,
tags=tags,
owner=owner,
Expand Down
13 changes: 3 additions & 10 deletions sdk/python/tests/integration/registration/test_inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,13 @@
SparkSource,
)
from feast.on_demand_feature_view import on_demand_feature_view
from feast.types import String, UnixTimestamp
from feast.types import String, UnixTimestamp, PrimitiveFeastType
from tests.utils.data_source_utils import (
prep_file_source,
simple_bq_source_using_query_arg,
simple_bq_source_using_table_arg,
)

from feast.field import Field
from feast.types import PrimitiveFeastType


def test_update_entities_with_inferred_types_from_feature_views(
simple_dataset_1, simple_dataset_2
Expand Down Expand Up @@ -172,9 +169,7 @@ def test_on_demand_features_type_inference():
# Create Feature Views
date_request = RequestSource(
name="date_request",
schema=[
Field(name="some_date", dtype=PrimitiveFeastType.UNIX_TIMESTAMP),
],
schema=[Field(name="some_date", dtype=PrimitiveFeastType.UNIX_TIMESTAMP),],
)

@on_demand_feature_view(
Expand Down Expand Up @@ -234,9 +229,7 @@ def test_datasource_inference():
# Create Feature Views
date_request = RequestSource(
name="date_request",
schema=[
Field(name="some_date", dtype=PrimitiveFeastType.UNIX_TIMESTAMP),
],
schema=[Field(name="some_date", dtype=PrimitiveFeastType.UNIX_TIMESTAMP),],
)

@on_demand_feature_view(
Expand Down
8 changes: 3 additions & 5 deletions sdk/python/tests/integration/registration/test_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@
from feast.protos.feast.types import Value_pb2 as ValueProto
from feast.registry import Registry
from feast.repo_config import RegistryConfig
from feast.types import Array, Bytes, Float32, Int32, Int64, String
from feast.types import Array, Bytes, Float32, Int32, Int64, String, PrimitiveFeastType
from feast.value_type import ValueType
from feast.field import Field
from feast.types import PrimitiveFeastType


@pytest.fixture
def local_registry() -> Registry:
Expand Down Expand Up @@ -249,9 +249,7 @@ def test_modify_feature_views_success(test_registry):

request_source = RequestSource(
name="request_source",
schema=[
Field(name="my_input_1", dtype=PrimitiveFeastType.INT32),
],
schema=[Field(name="my_input_1", dtype=PrimitiveFeastType.INT32),],
)

fv1 = FeatureView(
Expand Down
14 changes: 6 additions & 8 deletions sdk/python/tests/unit/test_data_sources.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
from aiohttp import request
import pytest
from aiohttp import request
from sdk.python.feast.types import ComplexFeastType

from feast import ValueType
from feast.data_source import PushSource, RequestDataSource, RequestSource
from feast.infra.offline_stores.bigquery_source import BigQuerySource
from feast.field import Field
from feast.infra.offline_stores.bigquery_source import BigQuerySource
from feast.types import PrimitiveFeastType
from sdk.python.feast.types import ComplexFeastType


def test_push_with_batch():
push_source = PushSource(
Expand Down Expand Up @@ -35,17 +36,14 @@ def test_request_data_source_deprecation():
returned_request_source = RequestSource.from_proto(request_data_source_proto)
assert returned_request_source == request_data_source


def test_request_source_primitive_type_to_proto():
schema = [
Field(name="f1", dtype=PrimitiveFeastType.FLOAT32),
Field(name="f2", dtype=PrimitiveFeastType.BOOL),
]
request_source = RequestSource(
name="source",
schema=schema,
description="desc",
tags={},
owner="feast",
name="source", schema=schema, description="desc", tags={}, owner="feast",
)
request_proto = request_source.to_proto()
deserialized_request_source = RequestSource.from_proto(request_proto)
Expand Down