From 63ab5757063023b1391f88083a4a318d96d57d85 Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Fri, 15 Apr 2022 21:48:08 -0700 Subject: [PATCH 1/3] feat: Create stream and batch feature view abstractions Signed-off-by: Achal Shah --- sdk/python/feast/batch_feature_view.py | 61 ++++++++++++++++++ sdk/python/feast/stream_feature_view.py | 57 +++++++++++++++++ sdk/python/tests/unit/test_feature_views.py | 70 +++++++++++++++++++++ 3 files changed, 188 insertions(+) create mode 100644 sdk/python/feast/batch_feature_view.py create mode 100644 sdk/python/feast/stream_feature_view.py create mode 100644 sdk/python/tests/unit/test_feature_views.py diff --git a/sdk/python/feast/batch_feature_view.py b/sdk/python/feast/batch_feature_view.py new file mode 100644 index 00000000000..7b2057898d0 --- /dev/null +++ b/sdk/python/feast/batch_feature_view.py @@ -0,0 +1,61 @@ +from datetime import timedelta +from typing import Dict, List, Optional, Union + +from google.protobuf.duration_pb2 import Duration + +from feast import Entity, Feature, Field +from feast.data_source import DataSource +from feast.feature_view import FeatureView +from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto + +SUPPORTED_BATCH_SOURCES = { + "BigQuerySource", + "FileSource", + "RedshiftSource", + "SnowflakeSource", + "SparkSource", + "TrinoSource", +} + + +class BatchFeatureView(FeatureView): + def __init__( + self, + *, + name: Optional[str] = None, + entities: Optional[Union[List[Entity], List[str]]] = None, + ttl: Optional[Union[Duration, timedelta]] = None, + features: Optional[List[Feature]] = None, + tags: Optional[Dict[str, str]] = None, + online: bool = True, + description: str = "", + owner: str = "", + schema: Optional[List[Field]] = None, + source: Optional[DataSource] = None, + ): + + if source is None: + raise ValueError("Feature views need a source specified") + if ( + type(source).__name__ not in SUPPORTED_BATCH_SOURCES + and source.to_proto().type != DataSourceProto.SourceType.CUSTOM_SOURCE + ): + raise ValueError( + f"Batch feature views need a batch source, expected one of {SUPPORTED_BATCH_SOURCES} " + f"or CUSTOM_SOURCE, got {type(source).__name__}: {source.name} instead " + ) + + super().__init__( + name=name, + entities=entities, + ttl=ttl, + batch_source=None, + stream_source=None, + features=features, + tags=tags, + online=online, + description=description, + owner=owner, + schema=schema, + source=source, + ) diff --git a/sdk/python/feast/stream_feature_view.py b/sdk/python/feast/stream_feature_view.py new file mode 100644 index 00000000000..d5fdb61695c --- /dev/null +++ b/sdk/python/feast/stream_feature_view.py @@ -0,0 +1,57 @@ +from datetime import timedelta +from typing import Dict, List, Optional, Union + +from google.protobuf.duration_pb2 import Duration + +from feast import Entity, Feature, Field +from feast.data_source import DataSource +from feast.feature_view import FeatureView +from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto + +SUPPORTED_STREAM_SOURCES = { + "KafkaSource", + "KinesisSource", +} + + +class StreamFeatureView(FeatureView): + def __init__( + self, + *, + name: Optional[str] = None, + entities: Optional[Union[List[Entity], List[str]]] = None, + ttl: Optional[Union[Duration, timedelta]] = None, + features: Optional[List[Feature]] = None, + tags: Optional[Dict[str, str]] = None, + online: bool = True, + description: str = "", + owner: str = "", + schema: Optional[List[Field]] = None, + source: Optional[DataSource] = None, + ): + + if source is None: + raise ValueError("Feature views need a source specified") + if ( + type(source).__name__ not in SUPPORTED_STREAM_SOURCES + and source.to_proto().type != DataSourceProto.SourceType.CUSTOM_SOURCE + ): + raise ValueError( + f"Stream feature views need a stream source, expected one of {SUPPORTED_STREAM_SOURCES} " + f"or CUSTOM_SOURCE, got {type(source).__name__}: {source.name} instead " + ) + + super().__init__( + name=name, + entities=entities, + ttl=ttl, + batch_source=None, + stream_source=None, + features=features, + tags=tags, + online=online, + description=description, + owner=owner, + schema=schema, + source=source, + ) diff --git a/sdk/python/tests/unit/test_feature_views.py b/sdk/python/tests/unit/test_feature_views.py new file mode 100644 index 00000000000..04be8aa758a --- /dev/null +++ b/sdk/python/tests/unit/test_feature_views.py @@ -0,0 +1,70 @@ +from datetime import timedelta + +import pytest + +from feast import KafkaSource +from feast.batch_feature_view import BatchFeatureView +from feast.data_format import AvroFormat +from feast.infra.offline_stores.file_source import FileSource +from feast.stream_feature_view import StreamFeatureView + + +def test_create_batch_feature_view(): + batch_source = FileSource("some path") + BatchFeatureView( + name="test batch feature view", + entities=[], + ttl=timedelta(days=30), + source=batch_source, + ) + + with pytest.raises(ValueError): + BatchFeatureView( + name="test batch feature view", entities=[], ttl=timedelta(days=30) + ) + + stream_source = KafkaSource( + name="kafka", + event_timestamp_column="", + bootstrap_servers="", + message_format=AvroFormat(""), + topic="topic", + batch_source=FileSource("some path"), + ) + with pytest.raises(ValueError): + BatchFeatureView( + name="test batch feature view", + entities=[], + ttl=timedelta(days=30), + source=stream_source, + ) + + +def test_create_stream_feature_view(): + stream_source = KafkaSource( + name="kafka", + event_timestamp_column="", + bootstrap_servers="", + message_format=AvroFormat(""), + topic="topic", + batch_source=FileSource("some path"), + ) + StreamFeatureView( + name="test batch feature view", + entities=[], + ttl=timedelta(days=30), + source=stream_source, + ) + + with pytest.raises(ValueError): + StreamFeatureView( + name="test batch feature view", entities=[], ttl=timedelta(days=30) + ) + + with pytest.raises(ValueError): + StreamFeatureView( + name="test batch feature view", + entities=[], + ttl=timedelta(days=30), + source=FileSource("some path"), + ) From 2f01c0ed858404d37d6d6468307c3067083a6735 Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Mon, 18 Apr 2022 10:25:37 -0700 Subject: [PATCH 2/3] CR Signed-off-by: Achal Shah --- CONTRIBUTING.md | 1 + sdk/python/feast/batch_feature_view.py | 9 +++------ sdk/python/feast/stream_feature_view.py | 9 +++------ sdk/python/tests/unit/test_feature_views.py | 2 +- 4 files changed, 8 insertions(+), 13 deletions(-) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index cb17012eea3..de242fea540 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -176,6 +176,7 @@ The services with containerized replacements currently implemented are: - Datastore - DynamoDB - Redis +- Trino You can run `make test-python-integration-container` to run tests against the containerized versions of dependencies. diff --git a/sdk/python/feast/batch_feature_view.py b/sdk/python/feast/batch_feature_view.py index 7b2057898d0..2f9fb080dbd 100644 --- a/sdk/python/feast/batch_feature_view.py +++ b/sdk/python/feast/batch_feature_view.py @@ -1,11 +1,10 @@ from datetime import timedelta from typing import Dict, List, Optional, Union -from google.protobuf.duration_pb2 import Duration - -from feast import Entity, Feature, Field from feast.data_source import DataSource +from feast.entity import Entity from feast.feature_view import FeatureView +from feast.field import Field from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto SUPPORTED_BATCH_SOURCES = { @@ -24,8 +23,7 @@ def __init__( *, name: Optional[str] = None, entities: Optional[Union[List[Entity], List[str]]] = None, - ttl: Optional[Union[Duration, timedelta]] = None, - features: Optional[List[Feature]] = None, + ttl: Optional[timedelta] = None, tags: Optional[Dict[str, str]] = None, online: bool = True, description: str = "", @@ -51,7 +49,6 @@ def __init__( ttl=ttl, batch_source=None, stream_source=None, - features=features, tags=tags, online=online, description=description, diff --git a/sdk/python/feast/stream_feature_view.py b/sdk/python/feast/stream_feature_view.py index d5fdb61695c..1c51b94a7cf 100644 --- a/sdk/python/feast/stream_feature_view.py +++ b/sdk/python/feast/stream_feature_view.py @@ -1,11 +1,10 @@ from datetime import timedelta from typing import Dict, List, Optional, Union -from google.protobuf.duration_pb2 import Duration - -from feast import Entity, Feature, Field from feast.data_source import DataSource +from feast.entity import Entity from feast.feature_view import FeatureView +from feast.field import Field from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto SUPPORTED_STREAM_SOURCES = { @@ -20,8 +19,7 @@ def __init__( *, name: Optional[str] = None, entities: Optional[Union[List[Entity], List[str]]] = None, - ttl: Optional[Union[Duration, timedelta]] = None, - features: Optional[List[Feature]] = None, + ttl: Optional[timedelta] = None, tags: Optional[Dict[str, str]] = None, online: bool = True, description: str = "", @@ -47,7 +45,6 @@ def __init__( ttl=ttl, batch_source=None, stream_source=None, - features=features, tags=tags, online=online, description=description, diff --git a/sdk/python/tests/unit/test_feature_views.py b/sdk/python/tests/unit/test_feature_views.py index 04be8aa758a..a01c744d9d1 100644 --- a/sdk/python/tests/unit/test_feature_views.py +++ b/sdk/python/tests/unit/test_feature_views.py @@ -2,9 +2,9 @@ import pytest -from feast import KafkaSource from feast.batch_feature_view import BatchFeatureView from feast.data_format import AvroFormat +from feast.data_source import KafkaSource from feast.infra.offline_stores.file_source import FileSource from feast.stream_feature_view import StreamFeatureView From 3ba43be62191a926e7e554c49b5b45d50612a0d9 Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Mon, 18 Apr 2022 11:16:19 -0700 Subject: [PATCH 3/3] CR Signed-off-by: Achal Shah --- sdk/python/tests/unit/test_feature_views.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sdk/python/tests/unit/test_feature_views.py b/sdk/python/tests/unit/test_feature_views.py index a01c744d9d1..d78788f3aed 100644 --- a/sdk/python/tests/unit/test_feature_views.py +++ b/sdk/python/tests/unit/test_feature_views.py @@ -10,7 +10,7 @@ def test_create_batch_feature_view(): - batch_source = FileSource("some path") + batch_source = FileSource(path="some path") BatchFeatureView( name="test batch feature view", entities=[], @@ -29,7 +29,7 @@ def test_create_batch_feature_view(): bootstrap_servers="", message_format=AvroFormat(""), topic="topic", - batch_source=FileSource("some path"), + batch_source=FileSource(path="some path"), ) with pytest.raises(ValueError): BatchFeatureView( @@ -47,7 +47,7 @@ def test_create_stream_feature_view(): bootstrap_servers="", message_format=AvroFormat(""), topic="topic", - batch_source=FileSource("some path"), + batch_source=FileSource(path="some path"), ) StreamFeatureView( name="test batch feature view", @@ -66,5 +66,5 @@ def test_create_stream_feature_view(): name="test batch feature view", entities=[], ttl=timedelta(days=30), - source=FileSource("some path"), + source=FileSource(path="some path"), )