Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Fix
Signed-off-by: Kevin Zhang <kzhang@tecton.ai>
  • Loading branch information
kevjumba committed Apr 5, 2022
commit 245dc1906697560ee5c3d392502dd30e7c0e44b7
17 changes: 8 additions & 9 deletions sdk/python/feast/data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import warnings
import enum
import warnings
from abc import ABC, abstractmethod
Expand Down Expand Up @@ -183,23 +182,21 @@ class DataSource(ABC):
def __init__(
self,
event_timestamp_column: Optional[str] = None,
timestamp_field: Optional[str] = None,
created_timestamp_column: Optional[str] = None,
field_mapping: Optional[Dict[str, str]] = None,
date_partition_column: Optional[str] = None,
description: Optional[str] = "",
tags: Optional[Dict[str, str]] = None,
owner: Optional[str] = "",
name: Optional[str] = None,
timestamp_field: Optional[str] = None,
):
"""
Creates a DataSource object.
Args:
name: Name of data source, which should be unique within a project
event_timestamp_column (optional): (Deprecated) Event timestamp column used for point in time
joins of feature values.
timestamp_field (optional): Event timestamp field used for point
in time joins of feature values.
created_timestamp_column (optional): Timestamp column indicating when the row
was created, used for deduplicating rows.
field_mapping (optional): A dictionary mapping of column names in this data
Expand All @@ -210,6 +207,8 @@ def __init__(
tags (optional): A dictionary of key-value pairs to store arbitrary metadata.
owner (optional): The owner of the data source, typically the email of the primary
maintainer.
timestamp_field (optional): Event timestamp field used for point
in time joins of feature values.
"""
if not name:
warnings.warn(
Expand Down Expand Up @@ -400,6 +399,7 @@ def __eq__(self, other):
def from_proto(data_source: DataSourceProto):
return KafkaSource(
name=data_source.name,
event_timestamp_column=data_source.timestamp_field,
field_mapping=dict(data_source.field_mapping),
bootstrap_servers=data_source.kafka_options.bootstrap_servers,
message_format=StreamFormat.from_proto(
Expand All @@ -423,7 +423,7 @@ def to_proto(self) -> DataSourceProto:
description=self.description,
tags=self.tags,
owner=self.owner,
)
)

data_source_proto.timestamp_field = self.timestamp_field
data_source_proto.created_timestamp_column = self.created_timestamp_column
Expand Down Expand Up @@ -525,6 +525,7 @@ def get_table_column_names_and_types(
def from_proto(data_source: DataSourceProto):
return KinesisSource(
name=data_source.name,
event_timestamp_column=data_source.timestamp_field,
field_mapping=dict(data_source.field_mapping),
record_format=StreamFormat.from_proto(
data_source.kinesis_options.record_format
Expand Down Expand Up @@ -570,7 +571,7 @@ def __init__(
description=description,
tags=tags,
owner=owner,
timestamp_field=timestamp_field
timestamp_field=timestamp_field,
)
self.kinesis_options = KinesisOptions(
record_format=record_format, region=region, stream_name=stream_name
Expand Down Expand Up @@ -667,9 +668,7 @@ def __init__(
self.timestamp_field = timestamp_field or event_timestamp_column

if not self.timestamp_field:
raise ValueError(
f"timestamp field is needed for push source {self.name}"
)
raise ValueError(f"timestamp field is needed for push source {self.name}")

def validate(self, config: RepoConfig):
pass
Expand Down
9 changes: 2 additions & 7 deletions sdk/python/feast/inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,7 @@ def update_data_sources_with_inferred_event_timestamp_col(
for data_source in data_sources:
if isinstance(data_source, RequestDataSource):
continue
if (
data_source.timestamp_field is None
or data_source.timestamp_field == ""
):
if data_source.timestamp_field is None or data_source.timestamp_field == "":
# prepare right match pattern for data source
ts_column_type_regex_pattern = ""
# TODO(adchia): Move Spark source inference out of this logic
Expand Down Expand Up @@ -171,9 +168,7 @@ def update_feature_views_with_inferred_features(

if fv.batch_source.timestamp_field in fv.batch_source.field_mapping:
columns_to_exclude.add(
fv.batch_source.field_mapping[
fv.batch_source.timestamp_field
]
fv.batch_source.field_mapping[fv.batch_source.timestamp_field]
)
if (
fv.batch_source.created_timestamp_column
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/feast/infra/offline_stores/bigquery_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ def __init__(
description=description,
tags=tags,
owner=owner,
timestamp_field=timestamp_field
timestamp_field=timestamp_field,
)

# Note: Python requires redefining hash in child classes that override __eq__
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def __init__(
description=description,
tags=tags,
owner=owner,
timestamp_field=timestamp_field
timestamp_field=timestamp_field,
)
warnings.warn(
"The spark data source API is an experimental feature in alpha development. "
Expand Down
4 changes: 1 addition & 3 deletions sdk/python/feast/infra/offline_stores/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,7 @@ def evaluate_historical_retrieval():

# Load feature view data from sources and join them incrementally
for feature_view, features in feature_views_to_features.items():
event_timestamp_column = (
feature_view.batch_source.timestamp_field
)
event_timestamp_column = feature_view.batch_source.timestamp_field
created_timestamp_column = (
feature_view.batch_source.created_timestamp_column
)
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/feast/infra/offline_stores/file_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ def from_proto(data_source: DataSourceProto):
field_mapping=dict(data_source.field_mapping),
file_format=FileFormat.from_proto(data_source.file_options.file_format),
path=data_source.file_options.uri,
timesstamp_field=data_source.timestamp_field,
timestamp_field=data_source.timestamp_field,
created_timestamp_column=data_source.created_timestamp_column,
s3_endpoint_override=data_source.file_options.s3_endpoint_override,
description=data_source.description,
Expand Down
4 changes: 0 additions & 4 deletions sdk/python/feast/infra/offline_stores/snowflake_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,8 @@ def __eq__(self, other):
and self.snowflake_options.schema == other.snowflake_options.schema
and self.snowflake_options.table == other.snowflake_options.table
and self.snowflake_options.query == other.snowflake_options.query
<<<<<<< HEAD
and self.snowflake_options.warehouse == other.snowflake_options.warehouse
and self.event_timestamp_column == other.event_timestamp_column
=======
and self.timestamp_field == other.timestamp_field
>>>>>>> 2330b0a8 (Update md files)
and self.created_timestamp_column == other.created_timestamp_column
and self.field_mapping == other.field_mapping
and self.description == other.description
Expand Down
3 changes: 1 addition & 2 deletions sdk/python/tests/example_repos/example_feature_repo_1.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@
)

customer_driver_combined_source = BigQuerySource(
table_ref="feast-oss.public.customer_driver",
timestamp_field="event_timestamp",
table_ref="feast-oss.public.customer_driver", timestamp_field="event_timestamp",
)

driver_locations_push_source = PushSource(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,7 @@ def create_data_source(
**kwargs,
) -> DataSource:
if timestamp_field in df:
df[timestamp_field] = pd.to_datetime(
df[timestamp_field], utc=True
)
df[timestamp_field] = pd.to_datetime(df[timestamp_field], utc=True)
# Make sure the field mapping is correct and convert the datetime datasources.
if field_mapping:
timestamp_mapping = {value: key for key, value in field_mapping.items()}
Expand Down
4 changes: 1 addition & 3 deletions sdk/python/tests/utils/data_source_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,7 @@ def simple_bq_source_using_table_ref_arg(
job = client.load_table_from_dataframe(df, table_ref)
job.result()

return BigQuerySource(
table_ref=table_ref, timestamp_field=event_timestamp_column,
)
return BigQuerySource(table_ref=table_ref, timestamp_field=event_timestamp_column,)


def simple_bq_source_using_query_arg(df, event_timestamp_column=None) -> BigQuerySource:
Expand Down