Skip to content

Commit cfb32f2

Browse files
committed
add special case: rename
Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>
1 parent 7afc84a commit cfb32f2

File tree

2 files changed

+11
-8
lines changed

2 files changed

+11
-8
lines changed

python/feast_spark/pyspark/historical_feature_retrieval_job.py

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -322,13 +322,16 @@ class FileDestination(NamedTuple):
322322

323323
def _map_column(df: DataFrame, col_mapping: Dict[str, str]):
324324
source_to_alias_map = {v: k for k, v in col_mapping.items()}
325-
projection = [
326-
expr(col_mapping.get(col_name, col_name)).alias(
327-
source_to_alias_map.get(col_name, col_name)
328-
)
329-
for col_name in df.columns
330-
]
331-
return df.select(projection)
325+
projection = {}
326+
327+
for col_name in df.columns + list(set(col_mapping) - set(df.columns)):
328+
if col_name in source_to_alias_map:
329+
# column rename
330+
projection[source_to_alias_map.get(col_name)] = col(col_name)
331+
else:
332+
projection[col_name] = expr(col_mapping.get(col_name, col_name))
333+
334+
return df.select([c.alias(a) for a, c in projection.items()])
332335

333336

334337
def as_of_join(

python/tests/test_historical_feature_retrieval.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ def large_entity_csv_file(pytestconfig, spark):
4747
file_path = os.path.join(temp_dir, "large_entity")
4848
entity_schema = StructType(
4949
[
50-
StructField("customer_id", IntegerType()),
50+
StructField("id", IntegerType()),
5151
StructField("event_timestamp", TimestampType()),
5252
]
5353
)

0 commit comments

Comments
 (0)