Skip to content

Commit fc07882

Browse files
committed
Add geo features to module 2
Signed-off-by: Danny Chiao <danny@tecton.ai>
1 parent cb3f800 commit fc07882

File tree

6 files changed

+89
-9
lines changed

6 files changed

+89
-9
lines changed

module_2/README.md

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,27 @@ TODO
44
- Define request data
55
- Define on demand transforms
66
- Note that this can also transforms pushed features (e.g. stream features)
7-
- Note that this can combine multiple feature views and request data
7+
- Note that this can combine multiple feature views and request data
8+
9+
10+
<h1>Module 2: On demand transformations</h1>
11+
12+
In this module, we introduce the concept of on demand transforms. These are transformations that execute on-the-fly and accept as input other feature views or request data.
13+
14+
We and focus on building features for online serving, and keeping them fresh with a combination of batch feature materialization and stream feature ingestion. We'll be roughly working towards the following:
15+
16+
- **Data sources**: Kafka + File source
17+
- **Online store**: Redis
18+
- **Use case**: Predicting churn for drivers in real time.
19+
20+
<img src="architecture.png" width=750>
21+
22+
<h2>Table of Contents</h2>
23+
24+
# Workshop
25+
## Step 1: Install Feast
26+
27+
First, we install Feast with Spark and Redis support:
28+
```bash
29+
pip install "feast[spark,redis]"
30+
```
91.7 KB
Binary file not shown.

module_2/data/gen_lat_lon.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
import numpy as np
2+
import math
3+
4+
import pandas as pd
5+
6+
7+
def gen_lat_lon():
8+
df = pd.read_parquet("driver_stats.parquet")
9+
10+
latitude = np.arccos(np.random.random(size=(1807, 1)) * 2 - 1)
11+
longitude = np.random.random(size=(1807, 1)) * math.pi * 2
12+
df["lat"] = latitude
13+
df["lon"] = longitude
14+
df.to_parquet("driver_stats_lat_lon.parquet")
15+
16+
17+
if __name__ == "__main__":
18+
gen_lat_lon()

module_2/feature_repo/data_sources.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,11 @@
44
PushSource,
55
RequestSource,
66
)
7-
from feast.types import Int64
7+
from feast.types import Int64, Float32
88

99
driver_stats = FileSource(
1010
name="driver_stats_source",
11-
path="../data/driver_stats.parquet", # Should be a remote path in reality for re-use
11+
path="../data/driver_stats_lat_lon.parquet", # Should be a remote path in reality for re-use
1212
timestamp_field="event_timestamp",
1313
created_timestamp_column="created",
1414
description="A table describing the stats of a driver based on hourly logs",
@@ -22,10 +22,12 @@
2222

2323
# Define a request data source which encodes features / information only
2424
# available at request time (e.g. part of the user initiated HTTP request)
25-
input_request = RequestSource(
26-
name="vals_to_add",
25+
driver_request = RequestSource(
26+
name="driver_request",
2727
schema=[
2828
Field(name="val_to_add", dtype=Int64),
2929
Field(name="val_to_add_2", dtype=Int64),
30+
Field(name="lat", dtype=Float32),
31+
Field(name="lon", dtype=Float32),
3032
],
3133
)

module_2/feature_repo/feature_services.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,3 +17,16 @@
1717
],
1818
owner="test3@gmail.com",
1919
)
20+
21+
feature_service = FeatureService(
22+
name="model_v3",
23+
features=[
24+
driver_hourly_stats_view,
25+
driver_daily_features_view,
26+
transformed_conv_rate,
27+
avg_hourly_miles_driven,
28+
location_features,
29+
],
30+
owner="test3@gmail.com",
31+
)
32+

module_2/feature_repo/features.py

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
Field,
77
)
88
from feast.on_demand_feature_view import on_demand_feature_view
9-
from feast.types import Float32, Float64
9+
from feast.types import Float32, Float64, String
1010

1111
from data_sources import *
1212
from entities import *
@@ -19,6 +19,8 @@
1919
schema=[
2020
Field(name="conv_rate", dtype=Float32),
2121
Field(name="acc_rate", dtype=Float32),
22+
Field(name="lat", dtype=Float32),
23+
Field(name="lon", dtype=Float32),
2224
],
2325
online=True,
2426
source=driver_stats,
@@ -41,16 +43,38 @@
4143
# Define an on demand feature view which can generate new features based on
4244
# existing feature views and RequestSource features
4345
@on_demand_feature_view(
44-
sources=[driver_hourly_stats_view, driver_daily_features_view, input_request,],
46+
sources=[driver_hourly_stats_view, driver_request],
4547
schema=[
4648
Field(name="conv_rate_plus_val1", dtype=Float64),
4749
Field(name="conv_rate_plus_val2", dtype=Float64),
48-
Field(name="daily_miles_driven_normalized", dtype=Float64),
4950
],
5051
)
5152
def transformed_conv_rate(inputs: pd.DataFrame) -> pd.DataFrame:
5253
df = pd.DataFrame()
5354
df["conv_rate_plus_val1"] = inputs["conv_rate"] + inputs["val_to_add"]
5455
df["conv_rate_plus_val2"] = inputs["conv_rate"] + inputs["val_to_add_2"]
55-
df["daily_miles_driven_normalized"] = inputs["daily_miles_driven"] / 1000
56+
return df
57+
58+
59+
@on_demand_feature_view(
60+
sources=[driver_daily_features_view],
61+
schema=[Field(name="avg_hourly_miles_driven", dtype=Float64),],
62+
)
63+
def avg_hourly_miles_driven(inputs: pd.DataFrame) -> pd.DataFrame:
64+
df = pd.DataFrame()
65+
df["avg_hourly_miles_driven"] = inputs["daily_miles_driven"] / 24
66+
return df
67+
68+
69+
@on_demand_feature_view(
70+
sources=[driver_daily_features_view, driver_request],
71+
schema=[Field(name=f"geohash_{i}", dtype=String) for i in range(1, 7)],
72+
)
73+
def location_features(inputs: pd.DataFrame) -> pd.DataFrame:
74+
import pygeohash as gh
75+
76+
df = pd.DataFrame()
77+
geohash = df.apply(lambda x: gh.encode(x.lat, x.lon), axis=1)
78+
for i in range(1, 7):
79+
df[f"geohash_{i}"] = geohash.str[:i]
5680
return df

0 commit comments

Comments
 (0)