Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Reorder columns in _write_to_offline_store
Signed-off-by: Felix Wang <wangfelix98@gmail.com>
  • Loading branch information
felixwang9817 committed Jun 30, 2022
commit 791322b3bceb8d479bf637a7ccee097df20412ac
30 changes: 28 additions & 2 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import copy
import itertools
import os
import re
import warnings
from collections import Counter, defaultdict
from datetime import datetime, timedelta
Expand Down Expand Up @@ -1420,9 +1421,13 @@ def _write_to_offline_store(
feature_view_name: str,
df: pd.DataFrame,
allow_registry_cache: bool = True,
reorder_columns: bool = True,
):
"""
ingests data directly into the Online store
Persists the dataframe directly into the batch data source for the given feature view.

Fails if the dataframe columns do not match the columns of the batch data source. Optionally
reorders the columns of the dataframe to match.
"""
# TODO: restrict this to work with online StreamFeatureViews and validate the FeatureView type
try:
Expand All @@ -1433,7 +1438,28 @@ def _write_to_offline_store(
feature_view = self.get_feature_view(
feature_view_name, allow_registry_cache=allow_registry_cache
)
df.reset_index(drop=True)

# Get columns of the batch source and the input dataframe. Ignore columns with double
# underscores, which often signal an internal-use column.
column_names_and_types = feature_view.batch_source.get_table_column_names_and_types(
self.config
)
source_columns = [column for column, _ in column_names_and_types]
source_columns = [
column for column in source_columns if not re.match("__|__$", column)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there are columns w/. underscores what is the behavior here? Does it just auto fail? I'm confused about why we need to do this check, are we not writing to these internal columns?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah this isn't necessary; good catch

]
input_columns = df.columns.values.tolist()
input_columns = [
column for column in input_columns if not re.match("__|__$", column)
]

if set(input_columns) != set(source_columns):
raise ValueError(
f"The input dataframe has columns {set(input_columns)} but the batch source has columns {set(source_columns)}."
)

if reorder_columns:
df = df.reindex(columns=source_columns)

table = pa.Table.from_pandas(df)
provider = self._get_provider()
Expand Down
5 changes: 4 additions & 1 deletion sdk/python/feast/infra/contrib/spark_kafka_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,11 +140,14 @@ def batch_write(row: DataFrame, batch_id: int):
)
rows["created"] = pd.to_datetime("now", utc=True)

# Reset indices to ensure the dataframe has all the required columns.
rows = rows.reset_index()

# Optionally execute preprocessor before writing to the online store.
if self.preprocess_fn:
rows = self.preprocess_fn(rows)

# Finally persist the data to the online store.
# Finally persist the data to the online store and/or offline store.
if rows.size > 0:
if to == PushMode.ONLINE or to == PushMode.ONLINE_AND_OFFLINE:
self.fs.write_to_online_store(self.sfv.name, rows)
Expand Down