Skip to content

Commit 489a0f8

Browse files
Change internal references from input to batch_source (#1729)
* Change internal references from input to batch_source Signed-off-by: Felix Wang <wangfelix98@gmail.com> * Add deprecation warning Signed-off-by: Felix Wang <wangfelix98@gmail.com> * Change deprecation release to 0.12 Signed-off-by: Felix Wang <wangfelix98@gmail.com> * Change deprecation release to 0.13 Signed-off-by: Felix Wang <wangfelix98@gmail.com>
1 parent 47f30b3 commit 489a0f8

28 files changed

+108
-88
lines changed

docs/concepts/feature-view.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ driver_stats_fv = FeatureView(
1414
Feature(name="trips_today", dtype=ValueType.INT64),
1515
Feature(name="rating", dtype=ValueType.FLOAT),
1616
],
17-
input=BigQuerySource(
17+
batch_source=BigQuerySource(
1818
table_ref="feast-oss.demo_data.driver_activity"
1919
)
2020
)

docs/concepts/feature-views.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,10 +108,10 @@ driver_stats_fv = FeatureView(
108108
Feature(name="avg_daily_trips", dtype=ValueType.INT64),
109109
],
110110

111-
# Inputs are used to find feature values. In the case of this feature
111+
# Batch sources are used to find feature values. In the case of this feature
112112
# view we will query a source table on BigQuery for driver statistics
113113
# features
114-
input=driver_stats_source,
114+
batch_source=driver_stats_source,
115115

116116
# Tags are user defined key/value pairs that are attached to each
117117
# feature view

docs/reference/feature-repository.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ driver_locations = FeatureView(
111111
Feature(name="lat", dtype=ValueType.FLOAT),
112112
Feature(name="lon", dtype=ValueType.STRING),
113113
],
114-
input=driver_locations_source,
114+
batch_source=driver_locations_source,
115115
)
116116
```
117117
{% endcode %}

docs/reference/feature-repository/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ driver_locations = FeatureView(
111111
Feature(name="lat", dtype=ValueType.FLOAT),
112112
Feature(name="lon", dtype=ValueType.STRING),
113113
],
114-
input=driver_locations_source,
114+
batch_source=driver_locations_source,
115115
)
116116
```
117117
{% endcode %}

sdk/python/feast/feature_store.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,7 @@ def apply(
261261
>>> name="customer_fv",
262262
>>> entities=["customer"],
263263
>>> features=[Feature(name="age", dtype=ValueType.INT64)],
264-
>>> input=FileSource(path="file.parquet", event_timestamp_column="timestamp"),
264+
>>> batch_source=FileSource(path="file.parquet", event_timestamp_column="timestamp"),
265265
>>> ttl=timedelta(days=1)
266266
>>> )
267267
>>> fs.apply([customer_entity, customer_feature_view])
@@ -284,11 +284,11 @@ def apply(
284284
)
285285

286286
update_data_sources_with_inferred_event_timestamp_col(
287-
[view.input for view in views_to_update], self.config
287+
[view.batch_source for view in views_to_update], self.config
288288
)
289289

290290
for view in views_to_update:
291-
view.infer_features_from_input_source(self.config)
291+
view.infer_features_from_batch_source(self.config)
292292

293293
if len(views_to_update) + len(entities_to_update) + len(
294294
services_to_update

sdk/python/feast/feature_view.py

Lines changed: 29 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414
import re
15+
import warnings
1516
from datetime import datetime, timedelta
1617
from typing import Dict, List, Optional, Tuple, Union
1718

@@ -38,6 +39,8 @@
3839
from feast.usage import log_exceptions
3940
from feast.value_type import ValueType
4041

42+
warnings.simplefilter("once", DeprecationWarning)
43+
4144

4245
class FeatureView:
4346
"""
@@ -51,7 +54,7 @@ class FeatureView:
5154
ttl: Optional[timedelta]
5255
online: bool
5356
input: DataSource
54-
batch_source: Optional[DataSource] = None
57+
batch_source: DataSource
5558
stream_source: Optional[DataSource] = None
5659
created_timestamp: Optional[Timestamp] = None
5760
last_updated_timestamp: Optional[Timestamp] = None
@@ -63,13 +66,21 @@ def __init__(
6366
name: str,
6467
entities: List[str],
6568
ttl: Optional[Union[Duration, timedelta]],
66-
input: DataSource,
69+
input: Optional[DataSource] = None,
6770
batch_source: Optional[DataSource] = None,
6871
stream_source: Optional[DataSource] = None,
6972
features: List[Feature] = None,
7073
tags: Optional[Dict[str, str]] = None,
7174
online: bool = True,
7275
):
76+
warnings.warn(
77+
(
78+
"The argument 'input' is being deprecated. Please use 'batch_source' "
79+
"instead. Feast 0.13 and onwards will not support the argument 'input'."
80+
),
81+
DeprecationWarning,
82+
)
83+
7384
_input = input or batch_source
7485
assert _input is not None
7586

@@ -139,7 +150,7 @@ def __eq__(self, other):
139150
return False
140151
if sorted(self.features) != sorted(other.features):
141152
return False
142-
if self.input != other.input:
153+
if self.batch_source != other.batch_source:
143154
return False
144155
if self.stream_source != other.stream_source:
145156
return False
@@ -182,10 +193,8 @@ def to_proto(self) -> FeatureViewProto:
182193
ttl_duration = Duration()
183194
ttl_duration.FromTimedelta(self.ttl)
184195

185-
batch_source_proto = self.input.to_proto()
186-
batch_source_proto.data_source_class_type = (
187-
f"{self.input.__class__.__module__}.{self.input.__class__.__name__}"
188-
)
196+
batch_source_proto = self.batch_source.to_proto()
197+
batch_source_proto.data_source_class_type = f"{self.batch_source.__class__.__module__}.{self.batch_source.__class__.__name__}"
189198

190199
stream_source_proto = None
191200
if self.stream_source:
@@ -217,7 +226,7 @@ def from_proto(cls, feature_view_proto: FeatureViewProto):
217226
Returns a FeatureViewProto object based on the feature view protobuf
218227
"""
219228

220-
_input = DataSource.from_proto(feature_view_proto.spec.batch_source)
229+
batch_source = DataSource.from_proto(feature_view_proto.spec.batch_source)
221230
stream_source = (
222231
DataSource.from_proto(feature_view_proto.spec.stream_source)
223232
if feature_view_proto.spec.HasField("stream_source")
@@ -242,8 +251,8 @@ def from_proto(cls, feature_view_proto: FeatureViewProto):
242251
and feature_view_proto.spec.ttl.nanos == 0
243252
else feature_view_proto.spec.ttl
244253
),
245-
input=_input,
246-
batch_source=_input,
254+
input=batch_source,
255+
batch_source=batch_source,
247256
stream_source=stream_source,
248257
)
249258

@@ -265,29 +274,30 @@ def most_recent_end_time(self) -> Optional[datetime]:
265274
return None
266275
return max([interval[1] for interval in self.materialization_intervals])
267276

268-
def infer_features_from_input_source(self, config: RepoConfig):
277+
def infer_features_from_batch_source(self, config: RepoConfig):
269278
if not self.features:
270279
columns_to_exclude = {
271-
self.input.event_timestamp_column,
272-
self.input.created_timestamp_column,
280+
self.batch_source.event_timestamp_column,
281+
self.batch_source.created_timestamp_column,
273282
} | set(self.entities)
274283

275-
for col_name, col_datatype in self.input.get_table_column_names_and_types(
276-
config
277-
):
284+
for (
285+
col_name,
286+
col_datatype,
287+
) in self.batch_source.get_table_column_names_and_types(config):
278288
if col_name not in columns_to_exclude and not re.match(
279289
"^__|__$",
280290
col_name, # double underscores often signal an internal-use column
281291
):
282292
feature_name = (
283-
self.input.field_mapping[col_name]
284-
if col_name in self.input.field_mapping.keys()
293+
self.batch_source.field_mapping[col_name]
294+
if col_name in self.batch_source.field_mapping.keys()
285295
else col_name
286296
)
287297
self.features.append(
288298
Feature(
289299
feature_name,
290-
self.input.source_datatype_to_feast_value_type()(
300+
self.batch_source.source_datatype_to_feast_value_type()(
291301
col_datatype
292302
),
293303
)

sdk/python/feast/inference.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ def update_entities_with_inferred_types_from_feature_views(
1313
entities: List[Entity], feature_views: List[FeatureView], config: RepoConfig
1414
) -> None:
1515
"""
16-
Infer entity value type by examining schema of feature view input sources
16+
Infer entity value type by examining schema of feature view batch sources
1717
"""
1818
incomplete_entities = {
1919
entity.name: entity
@@ -26,22 +26,22 @@ def update_entities_with_inferred_types_from_feature_views(
2626
if not (incomplete_entities_keys & set(view.entities)):
2727
continue # skip if view doesn't contain any entities that need inference
2828

29-
col_names_and_types = view.input.get_table_column_names_and_types(config)
29+
col_names_and_types = view.batch_source.get_table_column_names_and_types(config)
3030
for entity_name in view.entities:
3131
if entity_name in incomplete_entities:
32-
# get entity information from information extracted from the view input source
32+
# get entity information from information extracted from the view batch source
3333
extracted_entity_name_type_pairs = list(
3434
filter(lambda tup: tup[0] == entity_name, col_names_and_types)
3535
)
3636
if len(extracted_entity_name_type_pairs) == 0:
3737
# Doesn't mention inference error because would also be an error without inferencing
3838
raise ValueError(
39-
f"""No column in the input source for the {view.name} feature view matches
39+
f"""No column in the batch source for the {view.name} feature view matches
4040
its entity's name."""
4141
)
4242

4343
entity = incomplete_entities[entity_name]
44-
inferred_value_type = view.input.source_datatype_to_feast_value_type()(
44+
inferred_value_type = view.batch_source.source_datatype_to_feast_value_type()(
4545
extracted_entity_name_type_pairs[0][1]
4646
)
4747

sdk/python/feast/infra/aws.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ def materialize_single_feature_view(
9999

100100
offline_job = self.offline_store.pull_latest_from_table_or_query(
101101
config=config,
102-
data_source=feature_view.input,
102+
data_source=feature_view.batch_source,
103103
join_key_columns=join_key_columns,
104104
feature_name_columns=feature_name_columns,
105105
event_timestamp_column=event_timestamp_column,
@@ -110,8 +110,8 @@ def materialize_single_feature_view(
110110

111111
table = offline_job.to_arrow()
112112

113-
if feature_view.input.field_mapping is not None:
114-
table = _run_field_mapping(table, feature_view.input.field_mapping)
113+
if feature_view.batch_source.field_mapping is not None:
114+
table = _run_field_mapping(table, feature_view.batch_source.field_mapping)
115115

116116
join_keys = [entity.join_key for entity in entities]
117117
rows_to_write = _convert_arrow_to_proto(table, feature_view, join_keys)

sdk/python/feast/infra/gcp.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ def materialize_single_feature_view(
102102

103103
offline_job = self.offline_store.pull_latest_from_table_or_query(
104104
config=config,
105-
data_source=feature_view.input,
105+
data_source=feature_view.batch_source,
106106
join_key_columns=join_key_columns,
107107
feature_name_columns=feature_name_columns,
108108
event_timestamp_column=event_timestamp_column,
@@ -112,8 +112,8 @@ def materialize_single_feature_view(
112112
)
113113
table = offline_job.to_arrow()
114114

115-
if feature_view.input.field_mapping is not None:
116-
table = _run_field_mapping(table, feature_view.input.field_mapping)
115+
if feature_view.batch_source.field_mapping is not None:
116+
table = _run_field_mapping(table, feature_view.batch_source.field_mapping)
117117

118118
join_keys = [entity.join_key for entity in entities]
119119
rows_to_write = _convert_arrow_to_proto(table, feature_view, join_keys)

sdk/python/feast/infra/local.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ def materialize_single_feature_view(
100100
) = _get_column_names(feature_view, entities)
101101

102102
offline_job = self.offline_store.pull_latest_from_table_or_query(
103-
data_source=feature_view.input,
103+
data_source=feature_view.batch_source,
104104
join_key_columns=join_key_columns,
105105
feature_name_columns=feature_name_columns,
106106
event_timestamp_column=event_timestamp_column,
@@ -111,8 +111,8 @@ def materialize_single_feature_view(
111111
)
112112
table = offline_job.to_arrow()
113113

114-
if feature_view.input.field_mapping is not None:
115-
table = _run_field_mapping(table, feature_view.input.field_mapping)
114+
if feature_view.batch_source.field_mapping is not None:
115+
table = _run_field_mapping(table, feature_view.batch_source.field_mapping)
116116

117117
join_keys = [entity.join_key for entity in entities]
118118
rows_to_write = _convert_arrow_to_proto(table, feature_view, join_keys)

0 commit comments

Comments
 (0)