Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
feat: Modified default example with different data types
Signed-off-by: ntkathole <nikhilkathole2683@gmail.com>
  • Loading branch information
ntkathole committed Feb 25, 2026
commit 2323181a15a1cb881d091cfb21718a332aa8c400
30 changes: 29 additions & 1 deletion sdk/python/feast/driver_test_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,38 @@ def create_driver_hourly_stats_df(drivers, start_date, end_date) -> pd.DataFrame
df_all_drivers["conv_rate"] = np.random.random(size=rows).astype(np.float32)
df_all_drivers["acc_rate"] = np.random.random(size=rows).astype(np.float32)
df_all_drivers["avg_daily_trips"] = np.random.randint(0, 1000, size=rows).astype(
np.int32
np.int64
)
df_all_drivers["created"] = pd.to_datetime(pd.Timestamp.now(tz=None).round("ms"))

# Complex type columns for Map, Json, and Struct examples
import json as _json

df_all_drivers["driver_metadata"] = [
{
"vehicle_type": np.random.choice(["sedan", "suv", "truck"]),
"rating": str(round(np.random.uniform(3.0, 5.0), 1)),
}
for _ in range(len(df_all_drivers))
]
df_all_drivers["driver_config"] = [
_json.dumps(
{
"max_distance_km": int(np.random.randint(10, 200)),
"preferred_zones": list(
np.random.choice(
["north", "south", "east", "west"], size=2, replace=False
)
),
}
)
for _ in range(len(df_all_drivers))
]
df_all_drivers["driver_profile"] = [
{"name": f"driver_{driver_id}", "age": str(int(np.random.randint(25, 60)))}
for driver_id in df_all_drivers["driver_id"]
]

# Create duplicate rows that should be filtered by created timestamp
# TODO: These duplicate rows area indirectly being filtered out by the point in time join already. We need to
# inject a bad row at a timestamp where we know it will get joined to the entity dataframe, and then test that
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,15 @@ def pa_to_trino_value_type(pa_type_as_str: str) -> str:
if pa_type_as_str.startswith("decimal"):
return trino_type.format(pa_type_as_str)

if pa_type_as_str.startswith("map<"):
return trino_type.format("varchar")

if pa_type_as_str == "large_string":
return trino_type.format("varchar")

if pa_type_as_str.startswith("struct<"):
return trino_type.format("varchar")

type_map = {
"null": "null",
"bool": "boolean",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from feast.feature_logging import LoggingConfig
from feast.infra.offline_stores.file_source import FileLoggingDestination
from feast.on_demand_feature_view import on_demand_feature_view
from feast.types import Float32, Float64, Int64
from feast.types import Float32, Float64, Int64, Json, Map, String, Struct

# Define a project for the feature repo
project = Project(name="%PROJECT_NAME%", description="A project for driver statistics")
Expand Down Expand Up @@ -52,12 +52,26 @@
Field(name="conv_rate", dtype=Float32),
Field(name="acc_rate", dtype=Float32),
Field(name="avg_daily_trips", dtype=Int64, description="Average daily trips"),
Field(
name="driver_metadata",
dtype=Map,
description="Driver metadata as key-value pairs",
),
Field(
name="driver_config", dtype=Json, description="Driver configuration as JSON"
),
Field(
name="driver_profile",
dtype=Struct({"name": String, "age": String}),
description="Driver profile as a typed struct",
),
],
online=True,
source=driver_stats_source,
# Tags are user defined key/value pairs that are attached to each
# feature view
tags={"team": "driver_performance"},
enable_validation=True,
)

# Define a request data source which encodes features / information only
Expand Down Expand Up @@ -119,6 +133,9 @@ def transformed_conv_rate(inputs: pd.DataFrame) -> pd.DataFrame:
Field(name="conv_rate", dtype=Float32),
Field(name="acc_rate", dtype=Float32),
Field(name="avg_daily_trips", dtype=Int64),
Field(name="driver_metadata", dtype=Map),
Field(name="driver_config", dtype=Json),
Field(name="driver_profile", dtype=Struct({"name": String, "age": String})),
],
online=True,
source=driver_stats_push_source, # Changed from above
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
import subprocess
from datetime import datetime

Expand Down Expand Up @@ -45,6 +46,11 @@ def run_demo():
"conv_rate": [1.0],
"acc_rate": [1.0],
"avg_daily_trips": [1000],
"driver_metadata": [{"vehicle_type": "truck", "rating": "5.0"}],
"driver_config": [
json.dumps({"max_distance_km": 500, "preferred_zones": ["north"]})
],
"driver_profile": [{"name": "driver_1001_updated", "age": "30"}],
}
)
print(event_df)
Expand Down Expand Up @@ -115,6 +121,9 @@ def fetch_online_features(store, source: str = ""):
else:
features_to_fetch = [
"driver_hourly_stats:acc_rate",
"driver_hourly_stats:driver_metadata",
"driver_hourly_stats:driver_config",
"driver_hourly_stats:driver_profile",
"transformed_conv_rate:conv_rate_plus_val1",
"transformed_conv_rate:conv_rate_plus_val2",
]
Expand Down
18 changes: 18 additions & 0 deletions sdk/python/feast/type_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -1319,6 +1319,12 @@ def pa_to_redshift_value_type(pa_type: "pyarrow.DataType") -> str:
if pa_type_as_str.startswith("map<"):
return "super"
Comment thread
devin-ai-integration[bot] marked this conversation as resolved.

if pa_type_as_str == "large_string":
return "super"

if pa_type_as_str.startswith("struct<"):
return "super"

# We have to take into account how arrow types map to parquet types as well.
# For example, null type maps to int32 in parquet, so we have to use int4 in Redshift.
# Other mappings have also been adjusted accordingly.
Expand Down Expand Up @@ -1636,6 +1642,18 @@ def pa_to_athena_value_type(pa_type: "pyarrow.DataType") -> str:
if pa_type_as_str.startswith("python_values_to_proto_values"):
return pa_type_as_str

if pa_type_as_str.startswith("list"):
return "array<string>"

if pa_type_as_str.startswith("map<"):
return "string"

if pa_type_as_str == "large_string":
return "string"

if pa_type_as_str.startswith("struct<"):
return "string"

# We have to take into account how arrow types map to parquet types as well.
# For example, null type maps to int32 in parquet, so we have to use int4 in Redshift.
# Other mappings have also been adjusted accordingly.
Expand Down
Loading