Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
File source
Signed-off-by: Kevin Zhang <kzhang@tecton.ai>
  • Loading branch information
kevjumba committed Mar 30, 2022
commit 292eb2f283ca09ac6a04ef08dbce1112cd4bf0d1
12 changes: 6 additions & 6 deletions sdk/python/feast/data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -395,14 +395,14 @@ def to_proto(self) -> DataSourceProto:
type=DataSourceProto.STREAM_KAFKA,
field_mapping=self.field_mapping,
kafka_options=self.kafka_options.to_proto(),
description=self.description,
tags=self.tags,
owner=self.owner,
)

data_source_proto.event_timestamp_column = self.event_timestamp_column
data_source_proto.created_timestamp_column = self.created_timestamp_column
data_source_proto.date_partition_column = self.date_partition_column
data_source_proto.description = self.description
data_source_proto.tags = self.tags
data_source_proto.owner = self.owner
return data_source_proto

@staticmethod
Expand Down Expand Up @@ -574,14 +574,14 @@ def to_proto(self) -> DataSourceProto:
type=DataSourceProto.STREAM_KINESIS,
field_mapping=self.field_mapping,
kinesis_options=self.kinesis_options.to_proto(),
description=self.description,
tags=self.tags,
owner=self.owner,
)

data_source_proto.event_timestamp_column = self.event_timestamp_column
data_source_proto.created_timestamp_column = self.created_timestamp_column
data_source_proto.date_partition_column = self.date_partition_column
data_source_proto.description = self.description
data_source_proto.tags = self.tags
data_source_proto.owner = self.owner

return data_source_proto

Expand Down
20 changes: 19 additions & 1 deletion sdk/python/feast/infra/offline_stores/file_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ def __init__(
date_partition_column: Optional[str] = "",
s3_endpoint_override: Optional[str] = None,
name: Optional[str] = "",
description: Optional[str] = "",
tags: Optional[Dict[str, str]] = None,
owner: Optional[str] = "",
):
"""Create a FileSource from a file containing feature data. Only Parquet format supported.

Expand All @@ -42,6 +45,10 @@ def __init__(
date_partition_column (optional): Timestamp column used for partitioning.
s3_endpoint_override (optional): Overrides AWS S3 enpoint with custom S3 storage
name (optional): Name for the file source. Defaults to the path.
description (optional):: A human-readable description.
tags (optional):: A dictionary of key-value pairs to store arbitrary metadata.
owner (optional):: The owner of the file source, typically the email of the primary
maintainer.

Examples:
>>> from feast import FileSource
Expand All @@ -63,6 +70,9 @@ def __init__(
created_timestamp_column,
field_mapping,
date_partition_column,
description=description,
tags=tags,
owner=owner,
)

# Note: Python requires redefining hash in child classes that override __eq__
Expand All @@ -82,6 +92,9 @@ def __eq__(self, other):
and self.field_mapping == other.field_mapping
and self.file_options.s3_endpoint_override
== other.file_options.s3_endpoint_override
and self.description == other.description
and self.tags == other.tags
and self.owner == other.owner
)

@property
Expand All @@ -102,6 +115,9 @@ def from_proto(data_source: DataSourceProto):
created_timestamp_column=data_source.created_timestamp_column,
date_partition_column=data_source.date_partition_column,
s3_endpoint_override=data_source.file_options.s3_endpoint_override,
description=data_source.description,
tags=dict(data_source.tags),
owner=data_source.owner,
)

def to_proto(self) -> DataSourceProto:
Expand All @@ -110,12 +126,14 @@ def to_proto(self) -> DataSourceProto:
type=DataSourceProto.BATCH_FILE,
field_mapping=self.field_mapping,
file_options=self.file_options.to_proto(),
description=self.description,
tags=self.tags,
owner=self.owner,
)

data_source_proto.event_timestamp_column = self.event_timestamp_column
data_source_proto.created_timestamp_column = self.created_timestamp_column
data_source_proto.date_partition_column = self.date_partition_column

return data_source_proto

def validate(self, config: RepoConfig):
Expand Down