Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Add a streaming source to the FeatureView API
This diff only updates the API. It is currently up to the providers to actually use this information to spin up resources to consume events from the stream sources.

Signed-off-by: Achal Shah <achals@gmail.com>
  • Loading branch information
achals committed Jun 28, 2021
commit 2e0892b3c2f97901e074d7dd9aa8a60447d53efb
44 changes: 39 additions & 5 deletions sdk/python/feast/feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
from google.protobuf.timestamp_pb2 import Timestamp

from feast import utils
from feast.data_source import DataSource
from feast.errors import RegistryInferenceFailure
from feast.data_source import BigQuerySource, DataSource, FileSource, KafkaSource, KinesisSource
from feast.feature import Feature
from feast.protos.feast.core.FeatureView_pb2 import FeatureView as FeatureViewProto
from feast.protos.feast.core.FeatureView_pb2 import (
Expand Down Expand Up @@ -50,7 +50,8 @@ class FeatureView:
ttl: Optional[timedelta]
online: bool
input: DataSource

batch_source: Optional[DataSource] = None
Comment thread
woop marked this conversation as resolved.
stream_source: Optional[DataSource] = None
created_timestamp: Optional[Timestamp] = None
last_updated_timestamp: Optional[Timestamp] = None
materialization_intervals: List[Tuple[datetime, datetime]]
Expand All @@ -62,15 +63,46 @@ def __init__(
entities: List[str],
ttl: Optional[Union[Duration, timedelta]],
input: DataSource,
batch_source: Optional[DataSource] = None,
stream_source: Optional[DataSource] = None,
features: List[Feature] = [],
tags: Optional[Dict[str, str]] = None,
online: bool = True,
):
assert input or batch_source
_input = input or batch_source

if not features:
features = [] # to handle python's mutable default arguments
columns_to_exclude = {
_input.event_timestamp_column,
_input.created_timestamp_column,
} | set(entities)

for col_name, col_datatype in _input.get_table_column_names_and_types():
if col_name not in columns_to_exclude and not re.match(
"^__|__$", col_name
):
features.append(
Feature(
col_name,
_input.source_datatype_to_feast_value_type()(col_datatype),
)
)

if not features:
raise ValueError(
f"Could not infer Features for the FeatureView named {name}."
f" Please specify Features explicitly for this FeatureView."
)

cols = [entity for entity in entities] + [feat.name for feat in features]
for col in cols:
if input.field_mapping is not None and col in input.field_mapping.keys():
if _input.field_mapping is not None and col in _input.field_mapping.keys():
raise ValueError(
f"The field {col} is mapped to {input.field_mapping[col]} for this data source. Please either remove this field mapping or use {input.field_mapping[col]} as the Entity or Feature name."
f"The field {col} is mapped to {_input.field_mapping[col]} for this data source. "
f"Please either remove this field mapping or use {_input.field_mapping[col]} as the "
f"Entity or Feature name."
)

self.name = name
Expand All @@ -84,7 +116,9 @@ def __init__(
self.ttl = ttl

self.online = online
self.input = input
self.input = _input
self.batch_source = _input
self.stream_source = stream_source
Comment thread
woop marked this conversation as resolved.
Outdated

self.materialization_intervals = []

Expand Down