Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fixed some tests
Signed-off-by: Francisco Javier Arceo <franciscojavierarceo@users.noreply.github.com>
  • Loading branch information
franciscojavierarceo committed Apr 3, 2024
commit 0ebfe7d5f3192c197dfa0e50e9fa8cf091eb208a
9 changes: 4 additions & 5 deletions sdk/python/tests/example_repos/example_feature_repo_1.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
import pandas as pd

from feast import Entity, FeatureService, FeatureView, Field, FileSource, PushSource
from feast.types import Float32, Int64, String
from feast.on_demand_feature_view import on_demand_feature_view
from feast.types import Float32, Int64, String

# Note that file source paths are not validated, so there doesn't actually need to be any data
# at the paths for these file sources. Since these paths are effectively fake, this example
Expand Down Expand Up @@ -101,16 +101,15 @@
tags={},
)


@on_demand_feature_view(
sources=[customer_profile],
schema=[
Field(name='on_demand_age', dtype=Int64)
],
schema=[Field(name="on_demand_age", dtype=Int64)],
mode="pandas",
)
def customer_profile_pandas_odfv(inputs: pd.DataFrame) -> pd.DataFrame:
outputs = pd.DataFrame()
outputs['on_demand_age'] = inputs['age'] + 1
outputs["on_demand_age"] = inputs["age"] + 1
return outputs


Expand Down
1 change: 1 addition & 0 deletions sdk/python/tests/unit/test_on_demand_feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ def test_python_native_transformation_mode():
}
) == {"feature1": 0, "feature2": 1, "output1": 100, "output2": 102}


# def test_get_online_features_on_demand():


Expand Down
11 changes: 7 additions & 4 deletions sdk/python/tests/unit/test_on_demand_pandas_transformation.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,16 @@ def pandas_view(inputs: pd.DataFrame) -> pd.DataFrame:
df["conv_rate_plus_acc"] = inputs["conv_rate"] + inputs["acc_rate"]
return df

store.apply(
[driver, driver_stats_source, driver_stats_fv, pandas_view]
)
store.apply([driver, driver_stats_source, driver_stats_fv, pandas_view])

entity_rows = [
{
"driver_id": 1001,
}
]
store.write_to_online_store(
feature_view_name="driver_hourly_stats", df=driver_df
)

online_response = store.get_online_features(
entity_rows=entity_rows,
Expand All @@ -87,4 +88,6 @@ def pandas_view(inputs: pd.DataFrame) -> pd.DataFrame:
],
).to_df()

assert online_response["conv_rate_plus_acc"].equals(1)
assert online_response["conv_rate_plus_acc"].equals(
online_response["conv_rate"] + online_response["acc_rate"]
)
164 changes: 95 additions & 69 deletions sdk/python/tests/unit/test_on_demand_python_transformation.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import os
import tempfile
import unittest
from datetime import datetime, timedelta
from typing import Any, Dict

import pandas as pd

Expand All @@ -10,95 +12,119 @@
from feast.infra.online_stores.sqlite import SqliteOnlineStoreConfig
from feast.on_demand_feature_view import on_demand_feature_view
from feast.types import Float32, Float64, Int64
from typing import Dict, Any


def test_python_pandas_parity():
with tempfile.TemporaryDirectory() as data_dir:
store = FeatureStore(
config=RepoConfig(
project="test_on_demand_python_transformation",
registry=os.path.join(data_dir, "registry.db"),
provider="local",
entity_key_serialization_version=2,
online_store=SqliteOnlineStoreConfig(
path=os.path.join(data_dir, "online.db")
),


class TestOnDemandPythonTransformation(unittest.TestCase):
def setUp(self):
# data_dir = tempfile.TemporaryDirectory()
with tempfile.TemporaryDirectory() as data_dir:
self.store = FeatureStore(
config=RepoConfig(
project="test_on_demand_python_transformation",
registry=os.path.join(data_dir, "registry.db"),
provider="local",
entity_key_serialization_version=2,
online_store=SqliteOnlineStoreConfig(
path=os.path.join(data_dir, "online.db")
),
)
)
)

# Generate test data.
end_date = datetime.now().replace(microsecond=0, second=0, minute=0)
start_date = end_date - timedelta(days=15)
# Generate test data.
end_date = datetime.now().replace(microsecond=0, second=0, minute=0)
start_date = end_date - timedelta(days=15)

driver_entities = [1001, 1002, 1003, 1004, 1005]
driver_df = create_driver_hourly_stats_df(driver_entities, start_date, end_date)
driver_stats_path = os.path.join(data_dir, "driver_stats.parquet")
driver_df.to_parquet(path=driver_stats_path, allow_truncated_timestamps=True)
driver_entities = [1001, 1002, 1003, 1004, 1005]
driver_df = create_driver_hourly_stats_df(
driver_entities, start_date, end_date
)
driver_stats_path = os.path.join(data_dir, "driver_stats.parquet")
driver_df.to_parquet(
path=driver_stats_path, allow_truncated_timestamps=True
)

driver = Entity(name="driver", join_keys=["driver_id"])
driver = Entity(name="driver", join_keys=["driver_id"])

driver_stats_source = FileSource(
name="driver_hourly_stats_source",
path=driver_stats_path,
timestamp_field="event_timestamp",
created_timestamp_column="created",
)
driver_stats_source = FileSource(
name="driver_hourly_stats_source",
path=driver_stats_path,
timestamp_field="event_timestamp",
created_timestamp_column="created",
)

driver_stats_fv = FeatureView(
name="driver_hourly_stats",
entities=[driver],
ttl=0,
schema=[
Field(name="conv_rate", dtype=Float32),
Field(name="acc_rate", dtype=Float32),
Field(name="avg_daily_trips", dtype=Int64),
],
online=True,
source=driver_stats_source,
)
driver_stats_fv = FeatureView(
name="driver_hourly_stats",
entities=[driver],
ttl=timedelta(days=0),
schema=[
Field(name="conv_rate", dtype=Float32),
Field(name="acc_rate", dtype=Float32),
Field(name="avg_daily_trips", dtype=Int64),
],
online=True,
source=driver_stats_source,
)

@on_demand_feature_view(
sources=[driver_stats_fv],
schema=[Field(name="conv_rate_plus_acc", dtype=Float64)],
mode="pandas",
)
def pandas_view(inputs: pd.DataFrame) -> pd.DataFrame:
df = pd.DataFrame()
df["conv_rate_plus_acc"] = inputs["conv_rate"] + inputs["acc_rate"]
return df

# @on_demand_feature_view(
# sources=[driver_stats_fv[["conv_rate", "acc_rate"]]],
# schema=[Field(name="conv_rate_plus_acc_python", dtype=Float64)],
# mode="python",
# )
# def python_view(inputs: Dict[str, Any]) -> Dict[str, Any]:
# output: Dict[str, Any] = {'conv_rate_plus_acc_python': inputs['conv_rate'] + inputs['acc_rate']}
# return output

store.apply(
[driver, driver_stats_source, driver_stats_fv, pandas_view]
)
@on_demand_feature_view(
sources=[driver_stats_fv],
schema=[Field(name="conv_rate_plus_acc_pandas", dtype=Float64)],
mode="pandas",
)
def pandas_view(inputs: pd.DataFrame) -> pd.DataFrame:
df = pd.DataFrame()
df["conv_rate_plus_acc_pandas"] = (
inputs["conv_rate"] + inputs["acc_rate"]
)
return df

@on_demand_feature_view(
sources=[driver_stats_fv[["conv_rate", "acc_rate"]]],
schema=[Field(name="conv_rate_plus_acc_python", dtype=Float64)],
mode="python",
)
def python_view(inputs: Dict[str, Any]) -> Dict[str, Any]:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still a little confused about the required signature here. Are these functions supposed to accept a dict of lists (looks like that in this test) and apply the udf for all entities at once? I thought from the previous PR that the goal was to have a udf that would be applied to individual entities...

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also alter the tests so that more than one entity is passed? this will probably fail in such a case as only first entity is processed. If we are sticking with this signature, udf should look something like this:

return {
                'conv_rate_plus_acc_python': [
                    conv_rate + acc_rate
                    for conv_rate, acc_rate in zip(inputs['conv_rate'], inputs['acc_rate'])
                ]
            }

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you look at _infer_features_dict you'll see it expects a dict of lists. I added an explicit test that shows this will result in a type failure when running the apply operations. We can add singleton execution as a follow up but this is sufficient to highlight the currently supported behavior and then we can cut a release.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@franciscojavierarceo got it, good... that's probably more efficient anyway. no rush, but in that case it will probably be a good idea to change type annotations for relevant functions to Dict[str, List[Any]].

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually originally had that setup but I received a ton of type failures from that which is why I did it this way.

Let me address both of those as folllowups. I want to merge this and cut a release.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made an issue here #4075, will close it later.

output: Dict[str, Any] = {
"conv_rate_plus_acc_python": inputs["conv_rate"]
+ inputs["acc_rate"]
}
return output

self.store.apply(
[driver, driver_stats_source, driver_stats_fv, pandas_view, python_view]
)
self.store.write_to_online_store(
feature_view_name="driver_hourly_stats", df=driver_df
)

def test_python_pandas_parity(self):
entity_rows = [
{
"driver_id": 1001,
# "event_timestamp": datetime(2021, 4, 12, 10, 59, 42),
}
]

online_response = store.get_online_features(
online_python_response = self.store.get_online_features(
entity_rows=entity_rows,
features=[
"driver_hourly_stats:conv_rate",
"driver_hourly_stats:acc_rate",
"driver_hourly_stats:avg_daily_trips",
"python_view:conv_rate_plus_acc_python",
"pandas_view:conv_rate_plus_acc",
],
).to_df()

assert online_response["conv_rate_plus_acc"].equals(
online_response["conv_rate_plus_acc_python"]
online_pandas_response = self.store.get_online_features(
entity_rows=entity_rows,
features=[
"driver_hourly_stats:conv_rate",
"driver_hourly_stats:acc_rate",
"pandas_view:conv_rate_plus_acc_pandas",
],
).to_df()

assert (
online_python_response["conv_rate_plus_acc_python"]
.equals(online_pandas_response["conv_rate_plus_acc_pandas"])
.equals(
online_python_response["conv_rate"] + online_python_response["acc_rate"]
)
)