Skip to content

Commit 49142c2

Browse files
authored
Implicit type conversion for entity and feature table source (#43)
* Implicit type conversion for entity and feature table source Signed-off-by: Khor Shu Heng <khor.heng@go-jek.com> * Restrict type casting only for double to float conversion Signed-off-by: Khor Shu Heng <khor.heng@go-jek.com> * Make allowable type casting configurable Signed-off-by: Khor Shu Heng <khor.heng@go-jek.com>
1 parent 72a3f4c commit 49142c2

File tree

9 files changed

+929
-3
lines changed

9 files changed

+929
-3
lines changed

python/feast_spark/pyspark/historical_feature_retrieval_job.py

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -560,6 +560,22 @@ def _read_and_verify_entity_df_from_source(
560560
return mapped_entity_df
561561

562562

563+
def _type_casting_allowed(feature_type: str, source_col_type):
564+
allowed_casting_for_source_col = {"double": ["float"]}
565+
566+
if feature_type == source_col_type:
567+
return True
568+
569+
allowed_feature_type_for_casting = allowed_casting_for_source_col.get(
570+
source_col_type
571+
)
572+
573+
return (
574+
allowed_feature_type_for_casting is not None
575+
and feature_type in allowed_feature_type_for_casting
576+
)
577+
578+
563579
def _read_and_verify_feature_table_df_from_source(
564580
spark: SparkSession, feature_table: FeatureTable, source: Source,
565581
) -> DataFrame:
@@ -591,10 +607,16 @@ def _read_and_verify_feature_table_df_from_source(
591607
feature_table_dtypes = dict(mapped_source_df.dtypes)
592608
for field in feature_table.entities + feature_table.features:
593609
column_type = feature_table_dtypes.get(field.name)
610+
594611
if column_type != field.spark_type:
595-
raise SchemaError(
596-
f"{field.name} should be of {field.spark_type} type, but is {column_type} instead"
597-
)
612+
if _type_casting_allowed(field.spark_type, column_type):
613+
mapped_source_df = mapped_source_df.withColumn(
614+
field.name, col(field.name).cast(field.spark_type)
615+
)
616+
else:
617+
raise SchemaError(
618+
f"{field.name} should be of {field.spark_type} type, but is {column_type} instead"
619+
)
598620

599621
for timestamp_column in [
600622
source.event_timestamp_column,

python/tests/data/bookings.csv

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
driver_id,event_timestamp,created_timestamp,completed_bookings
2+
8001,2020-08-31T00:00:00.000,2020-08-31T00:00:00.000,200
3+
8001,2020-09-01T00:00:00.000,2020-09-01T00:00:00.000,300
4+
8002,2020-09-01T00:00:00.000,2020-09-01T00:00:00.000,600
5+
8002,2020-09-01T00:00:00.000,2020-09-02T00:00:00.000,500
6+
8003,2020-09-01T00:00:00.000,2020-09-02T00:00:00.000,700
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
id,event_timestamp
2+
1001,2020-09-02T00:00:00.000
3+
1001,2020-09-03T00:00:00.000
4+
2001,2020-09-04T00:00:00.000
5+
2001,2020-09-04T00:00:00.000
6+
3001,2020-09-04T00:00:00.000
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
customer_id,total_bookings,datetime,created_datetime
2+
1001,200,2020-09-02T00:00:00.000,2020-09-02T00:00:00.000
3+
1001,400,2020-09-04T00:00:00.000,2020-09-02T00:00:00.000
4+
2001,500,2020-09-03T00:00:00.000,2020-09-01T00:00:00.000
5+
2001,600,2020-09-03T00:00:00.000,2020-09-02T00:00:00.000
6+
3001,700,2020-09-03T00:00:00.000,2020-09-03T00:00:00.000
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
customer_id,driver_id,event_timestamp
2+
1001,8001,2020-09-02T00:00:00.000
3+
1001,8002,2020-09-02T00:00:00.000
4+
1001,8002,2020-09-03T00:00:00.000
5+
2001,8002,2020-09-03T00:00:00.000
6+
2001,8002,2020-09-04T00:00:00.000

python/tests/data/customers.csv

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
customer_id,event_timestamp
2+
1001,2020-09-02T00:00:00.000
3+
1002,2020-09-02T00:00:00.000
4+
1003,2020-09-03T00:00:00.000
5+
1004,2020-09-03T00:00:00.000
6+
1005,2020-09-04T00:00:00.000
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
customer_id,event_timestamp
2+
1001,2020-09-02T00:00:00.000

python/tests/data/transactions.csv

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
customer_id,event_timestamp,created_timestamp,daily_transactions
2+
1001,2020-08-31T00:00:00.000,2020-09-01T00:00:00.000,50.0
3+
1001,2020-09-01T00:00:00.000,2020-09-01T00:00:00.000,100.0
4+
2001,2020-09-01T00:00:00.000,2020-08-31T00:00:00.000,80.0
5+
2001,2020-09-01T00:00:00.000,2020-09-01T00:00:00.000,200.0
6+
3001,2020-09-01T00:00:00.000,2020-09-01T00:00:00.000,300.0

0 commit comments

Comments
 (0)