Skip to content

Commit 7e7ac40

Browse files
committed
feat: Modified default example with different data types
Signed-off-by: ntkathole <nikhilkathole2683@gmail.com>
1 parent 3b2663a commit 7e7ac40

File tree

5 files changed

+83
-2
lines changed

5 files changed

+83
-2
lines changed

sdk/python/feast/driver_test_data.py

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,10 +136,38 @@ def create_driver_hourly_stats_df(drivers, start_date, end_date) -> pd.DataFrame
136136
df_all_drivers["conv_rate"] = np.random.random(size=rows).astype(np.float32)
137137
df_all_drivers["acc_rate"] = np.random.random(size=rows).astype(np.float32)
138138
df_all_drivers["avg_daily_trips"] = np.random.randint(0, 1000, size=rows).astype(
139-
np.int32
139+
np.int64
140140
)
141141
df_all_drivers["created"] = pd.to_datetime(pd.Timestamp.now(tz=None).round("ms"))
142142

143+
# Complex type columns for Map, Json, and Struct examples
144+
import json as _json
145+
146+
df_all_drivers["driver_metadata"] = [
147+
{
148+
"vehicle_type": np.random.choice(["sedan", "suv", "truck"]),
149+
"rating": str(round(np.random.uniform(3.0, 5.0), 1)),
150+
}
151+
for _ in range(len(df_all_drivers))
152+
]
153+
df_all_drivers["driver_config"] = [
154+
_json.dumps(
155+
{
156+
"max_distance_km": int(np.random.randint(10, 200)),
157+
"preferred_zones": list(
158+
np.random.choice(
159+
["north", "south", "east", "west"], size=2, replace=False
160+
)
161+
),
162+
}
163+
)
164+
for _ in range(len(df_all_drivers))
165+
]
166+
df_all_drivers["driver_profile"] = [
167+
{"name": f"driver_{driver_id}", "age": str(int(np.random.randint(25, 60)))}
168+
for driver_id in df_all_drivers["driver_id"]
169+
]
170+
143171
# Create duplicate rows that should be filtered by created timestamp
144172
# TODO: These duplicate rows area indirectly being filtered out by the point in time join already. We need to
145173
# inject a bad row at a timestamp where we know it will get joined to the entity dataframe, and then test that

sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/trino_type_map.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,15 @@ def pa_to_trino_value_type(pa_type_as_str: str) -> str:
6969
if pa_type_as_str.startswith("decimal"):
7070
return trino_type.format(pa_type_as_str)
7171

72+
if pa_type_as_str.startswith("map<"):
73+
return trino_type.format("varchar")
74+
75+
if pa_type_as_str == "large_string":
76+
return trino_type.format("varchar")
77+
78+
if pa_type_as_str.startswith("struct<"):
79+
return trino_type.format("varchar")
80+
7281
type_map = {
7382
"null": "null",
7483
"bool": "boolean",

sdk/python/feast/templates/local/feature_repo/feature_definitions.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
from feast.feature_logging import LoggingConfig
1818
from feast.infra.offline_stores.file_source import FileLoggingDestination
1919
from feast.on_demand_feature_view import on_demand_feature_view
20-
from feast.types import Float32, Float64, Int64
20+
from feast.types import Float32, Float64, Int64, Json, Map, String, Struct
2121

2222
# Define a project for the feature repo
2323
project = Project(name="%PROJECT_NAME%", description="A project for driver statistics")
@@ -52,12 +52,26 @@
5252
Field(name="conv_rate", dtype=Float32),
5353
Field(name="acc_rate", dtype=Float32),
5454
Field(name="avg_daily_trips", dtype=Int64, description="Average daily trips"),
55+
Field(
56+
name="driver_metadata",
57+
dtype=Map,
58+
description="Driver metadata as key-value pairs",
59+
),
60+
Field(
61+
name="driver_config", dtype=Json, description="Driver configuration as JSON"
62+
),
63+
Field(
64+
name="driver_profile",
65+
dtype=Struct({"name": String, "age": String}),
66+
description="Driver profile as a typed struct",
67+
),
5568
],
5669
online=True,
5770
source=driver_stats_source,
5871
# Tags are user defined key/value pairs that are attached to each
5972
# feature view
6073
tags={"team": "driver_performance"},
74+
enable_validation=True,
6175
)
6276

6377
# Define a request data source which encodes features / information only
@@ -119,6 +133,9 @@ def transformed_conv_rate(inputs: pd.DataFrame) -> pd.DataFrame:
119133
Field(name="conv_rate", dtype=Float32),
120134
Field(name="acc_rate", dtype=Float32),
121135
Field(name="avg_daily_trips", dtype=Int64),
136+
Field(name="driver_metadata", dtype=Map),
137+
Field(name="driver_config", dtype=Json),
138+
Field(name="driver_profile", dtype=Struct({"name": String, "age": String})),
122139
],
123140
online=True,
124141
source=driver_stats_push_source, # Changed from above

sdk/python/feast/templates/local/feature_repo/test_workflow.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import json
12
import subprocess
23
from datetime import datetime
34

@@ -45,6 +46,11 @@ def run_demo():
4546
"conv_rate": [1.0],
4647
"acc_rate": [1.0],
4748
"avg_daily_trips": [1000],
49+
"driver_metadata": [{"vehicle_type": "truck", "rating": "5.0"}],
50+
"driver_config": [
51+
json.dumps({"max_distance_km": 500, "preferred_zones": ["north"]})
52+
],
53+
"driver_profile": [{"name": "driver_1001_updated", "age": "30"}],
4854
}
4955
)
5056
print(event_df)
@@ -115,6 +121,9 @@ def fetch_online_features(store, source: str = ""):
115121
else:
116122
features_to_fetch = [
117123
"driver_hourly_stats:acc_rate",
124+
"driver_hourly_stats:driver_metadata",
125+
"driver_hourly_stats:driver_config",
126+
"driver_hourly_stats:driver_profile",
118127
"transformed_conv_rate:conv_rate_plus_val1",
119128
"transformed_conv_rate:conv_rate_plus_val2",
120129
]

sdk/python/feast/type_map.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1258,6 +1258,12 @@ def pa_to_redshift_value_type(pa_type: "pyarrow.DataType") -> str:
12581258
if pa_type_as_str.startswith("map<"):
12591259
return "super"
12601260

1261+
if pa_type_as_str == "large_string":
1262+
return "super"
1263+
1264+
if pa_type_as_str.startswith("struct<"):
1265+
return "super"
1266+
12611267
# We have to take into account how arrow types map to parquet types as well.
12621268
# For example, null type maps to int32 in parquet, so we have to use int4 in Redshift.
12631269
# Other mappings have also been adjusted accordingly.
@@ -1575,6 +1581,18 @@ def pa_to_athena_value_type(pa_type: "pyarrow.DataType") -> str:
15751581
if pa_type_as_str.startswith("python_values_to_proto_values"):
15761582
return pa_type_as_str
15771583

1584+
if pa_type_as_str.startswith("list"):
1585+
return "array<string>"
1586+
1587+
if pa_type_as_str.startswith("map<"):
1588+
return "string"
1589+
1590+
if pa_type_as_str == "large_string":
1591+
return "string"
1592+
1593+
if pa_type_as_str.startswith("struct<"):
1594+
return "string"
1595+
15781596
# We have to take into account how arrow types map to parquet types as well.
15791597
# For example, null type maps to int32 in parquet, so we have to use int4 in Redshift.
15801598
# Other mappings have also been adjusted accordingly.

0 commit comments

Comments
 (0)