Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
66 commits
Select commit Hold shift + click to select a range
508bad2
merged changes
franciscojavierarceo Sep 24, 2024
899f7f4
saving progress
franciscojavierarceo Aug 17, 2024
5b69c71
merged changes to odfv
franciscojavierarceo Sep 24, 2024
d107354
linted
franciscojavierarceo Aug 18, 2024
a34ec4d
adding the test needed to show the expected behavior
franciscojavierarceo Aug 18, 2024
b95d2a2
updated test case
franciscojavierarceo Aug 21, 2024
47974c2
saving progress
franciscojavierarceo Aug 21, 2024
ceb75a2
merging
franciscojavierarceo Sep 24, 2024
6688933
merged
franciscojavierarceo Sep 24, 2024
4c28acc
merged
franciscojavierarceo Sep 24, 2024
fd577dc
merging
franciscojavierarceo Sep 24, 2024
167fe6c
adding the entity keys for now to do retrieval
franciscojavierarceo Aug 29, 2024
54811d7
adding entity to odfv
franciscojavierarceo Aug 29, 2024
f0d87fc
checking in progress...getting closer
franciscojavierarceo Aug 29, 2024
b7091ca
may have to revert some of this...looks like the challenge is getting…
franciscojavierarceo Aug 31, 2024
d2a12f8
moving things around to make it easier to debug
franciscojavierarceo Sep 1, 2024
9d496ba
debugging
franciscojavierarceo Sep 1, 2024
58280aa
merged
franciscojavierarceo Sep 24, 2024
c42be75
merging
franciscojavierarceo Sep 24, 2024
fb3b315
Rebasing and merging changes from other PR
franciscojavierarceo Sep 6, 2024
82f3f8b
Merging changes continued
franciscojavierarceo Sep 7, 2024
172693d
update the _make_inference to include odfvs with writes in the update…
franciscojavierarceo Sep 7, 2024
66c5b57
have the table being written now...the create table happens in the Sq…
franciscojavierarceo Sep 8, 2024
ea3b98a
checking in progress
franciscojavierarceo Sep 9, 2024
905912b
adding logs
franciscojavierarceo Sep 10, 2024
25d42dd
updating permissions
franciscojavierarceo Sep 10, 2024
03d6116
going to error out on purpose
franciscojavierarceo Sep 10, 2024
9b16615
adding unit test and merging changes
franciscojavierarceo Sep 18, 2024
adbaeb6
almost got everything working and type validation behaving
franciscojavierarceo Sep 18, 2024
11d2914
cleaned up and have tests behaving
franciscojavierarceo Sep 18, 2024
64375ee
adding print
franciscojavierarceo Sep 21, 2024
3e6912a
removing print
franciscojavierarceo Sep 21, 2024
5751a72
checking in progress
franciscojavierarceo Sep 23, 2024
b0208e1
updating test
franciscojavierarceo Sep 25, 2024
463d8bb
adding test
franciscojavierarceo Sep 25, 2024
2981817
linted and updated
franciscojavierarceo Sep 25, 2024
3a33368
removed print
franciscojavierarceo Sep 25, 2024
22bf637
updated tests to test actual behavior
franciscojavierarceo Sep 25, 2024
0d0d117
checking in progress
franciscojavierarceo Sep 28, 2024
5bff836
changing typo
franciscojavierarceo Sep 28, 2024
271f814
updating test
franciscojavierarceo Sep 28, 2024
754b0e8
testing changes
franciscojavierarceo Sep 28, 2024
25c7181
checking to see if thing still working
franciscojavierarceo Sep 29, 2024
1d4023f
removed print
franciscojavierarceo Sep 29, 2024
3662102
undo change for odfv file
franciscojavierarceo Sep 29, 2024
74e7ede
updated tests
franciscojavierarceo Sep 30, 2024
59940cf
okay well have the unit test working
franciscojavierarceo Oct 1, 2024
b223feb
type changes, hope i dont regret them
franciscojavierarceo Oct 1, 2024
01770e2
updated stream feature view piece
franciscojavierarceo Oct 2, 2024
7606481
updated sfv ifelse
franciscojavierarceo Oct 2, 2024
c4ebf18
removing print
franciscojavierarceo Oct 2, 2024
72add32
formatted and updated test
franciscojavierarceo Oct 2, 2024
24e0a84
resolving some linter errors
franciscojavierarceo Oct 3, 2024
b92bf32
fixed linter and formatting
franciscojavierarceo Oct 3, 2024
934c1e9
okay think it is working
franciscojavierarceo Oct 3, 2024
0ce93f2
linter
franciscojavierarceo Oct 3, 2024
bf31d59
updated type map for integration tests
franciscojavierarceo Oct 4, 2024
9aff889
Merge branch 'master' into podfv2
franciscojavierarceo Oct 5, 2024
acc7b7f
updated local feature store test
franciscojavierarceo Oct 5, 2024
12d9100
fixed local fs test
franciscojavierarceo Oct 5, 2024
3c94614
chore: Updated snowflake test to be more explicit about post apply en…
franciscojavierarceo Oct 5, 2024
f01b691
merging
franciscojavierarceo Oct 5, 2024
c26ae75
fixed test to entity_rows_to_read
franciscojavierarceo Oct 5, 2024
c6d55c7
resolved inf conflicts
franciscojavierarceo Oct 5, 2024
52b8c4d
lint
franciscojavierarceo Oct 5, 2024
ca0971a
Updated tests and lint, think I have everything working
franciscojavierarceo Oct 6, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
checking to see if thing still working
Signed-off-by: Francisco Javier Arceo <farceo@redhat.com>
  • Loading branch information
franciscojavierarceo committed Sep 30, 2024
commit 25c718183ee4795a92c96dca512589b2f56cd34c
66 changes: 35 additions & 31 deletions sdk/python/feast/inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,10 +219,13 @@ def _infer_features_and_entities(
columns_to_exclude.remove(mapped_col)
columns_to_exclude.add(original_col)

# this is what gets the right stuff
table_column_names_and_types = fv.batch_source.get_table_column_names_and_types(
config
)

print('\n', "*" * 50, '\ncolumn names and types\n', "*" * 50)
print(fv.name, list(zip(table_column_names_and_types)))
for col_name, col_datatype in table_column_names_and_types:
if col_name in columns_to_exclude:
continue
Expand Down Expand Up @@ -296,44 +299,45 @@ def _infer_on_demand_features_and_entities(
table_column_names_and_types = batch_source.get_table_column_names_and_types(
config
)
if batch_field_mapping:
for col_name, col_datatype in table_column_names_and_types:
if col_name in columns_to_exclude:
continue
elif col_name in join_keys:
batch_field_mapping = getattr(batch_source, "field_mapping", {})

for col_name, col_datatype in table_column_names_and_types:
if col_name in columns_to_exclude:
continue
elif col_name in join_keys:
field = Field(
name=col_name,
dtype=from_value_type(
batch_source.source_datatype_to_feast_value_type()(
col_datatype
)
),
)
if field.name not in [
entity_column.name
for entity_column in entity_columns
if hasattr(entity_column, "name")
]:
entity_columns.append(field)
elif not re.match(
"^__|__$", col_name
): # double underscores often signal an internal-use column
if run_inference_for_features:
feature_name = (
batch_field_mapping[col_name]
if col_name in batch_field_mapping
else col_name
)
field = Field(
name=col_name,
name=feature_name,
dtype=from_value_type(
batch_source.source_datatype_to_feast_value_type()(
col_datatype
)
),
)
if field.name not in [
entity_column.name
for entity_column in entity_columns
if hasattr(entity_column, "name")
feature.name for feature in source_feature_view.features
]:
entity_columns.append(field)
elif not re.match(
"^__|__$", col_name
): # double underscores often signal an internal-use column
if run_inference_for_features:
feature_name = (
batch_field_mapping[col_name]
if col_name in batch_field_mapping
else col_name
)
field = Field(
name=feature_name,
dtype=from_value_type(
batch_source.source_datatype_to_feast_value_type()(
col_datatype
)
),
)
if field.name not in [
feature.name for feature in source_feature_view.features
]:
source_feature_view.features.append(field)
source_feature_view.features.append(field)
fv.entity_columns = entity_columns
1 change: 0 additions & 1 deletion sdk/python/feast/on_demand_feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,6 @@ def to_proto(self) -> OnDemandFeatureViewProto:
owner=self.owner,
write_to_online_store=self.write_to_online_store,
)
print("*" * 40, "\n", spec)

return OnDemandFeatureViewProto(spec=spec, meta=meta)

Expand Down
171 changes: 82 additions & 89 deletions sdk/python/tests/unit/test_on_demand_python_transformation.py
Original file line number Diff line number Diff line change
Expand Up @@ -493,8 +493,7 @@ def test_invalid_python_transformation_raises_type_error_on_apply():
schema=[Field(name="driver_name_lower", dtype=String)],
mode="python",
)
def python_view(inputs: dict[str, Any]) -> dict[str, Any]:
return {"driver_name_lower": []}
def python_view(inputs: dict[str, Any]) -> dict[str, Any]: return {"driver_name_lower": []}

with pytest.raises(
TypeError,
Expand All @@ -506,7 +505,7 @@ def python_view(inputs: dict[str, Any]) -> dict[str, Any]:


class TestOnDemandTransformationsWithWrites(unittest.TestCase):
def setUp(self):
def test_stored_writes(self):
with tempfile.TemporaryDirectory() as data_dir:
self.store = FeatureStore(
config=RepoConfig(
Expand Down Expand Up @@ -593,7 +592,6 @@ def python_stored_writes_feature_view(
"counter": [c + 1 for c in inputs["counter"]],
"input_datetime": [d for d in inputs["input_datetime"]],
}
print("running odfv transform")
return output

assert python_stored_writes_feature_view.entities == [driver.name]
Expand All @@ -613,15 +611,13 @@ def python_stored_writes_feature_view(
)

assert fv_applied.entities == [driver.name]
assert odfv_applied.entites == [driver.name]
assert odfv_applied.entities == [driver.name]

# Note here that after apply() is called, the entity_columns are populated with the join_key
assert fv_applied.entity_columns[0].name == driver.join_key
# assert fv_applied.entity_columns[0].name == driver.join_key
assert fv_applied.entity_columns == []
assert odfv_applied.entity_columns[0].name == driver.join_key

self.store.write_to_online_store(
feature_view_name="driver_hourly_stats", df=driver_df
)
assert len(self.store.list_all_feature_views()) == 2
assert len(self.store.list_feature_views()) == 1
assert len(self.store.list_on_demand_feature_views()) == 1
Expand All @@ -631,84 +627,81 @@ def python_stored_writes_feature_view(
== self.store.get_feature_view("driver_hourly_stats").entity_columns
)
assert (
self.store.get_on_demand_feature_view(
"python_stored_writes_feature_view"
).entity_columns
== self.store.get_feature_view("driver_hourly_stats").entity_columns
)

def test_stored_writes(self):
current_datetime = _utc_now()
fv_entity_rows_to_write = [
{
"driver_id": 1001,
"conv_rate": 0.25,
"acc_rate": 0.25,
"avg_daily_trips": 2,
"event_timestamp": current_datetime,
"created": current_datetime,
}
]
odfv_entity_rows_to_write = [
{
"driver_id": 1001,
"counter": 0,
"input_datetime": current_datetime,
}
]
fv_entity_rows_to_read = [
{
"driver_id": 1001,
}
]
# Note that here we shouldn't have to pass the request source features for reading
# because they should have already been written to the online store
odfv_entity_rows_to_read = [
{
"driver_id": 1001,
"conv_rate": 0.25,
"acc_rate": 0.25,
"counter": 0,
"input_datetime": current_datetime,
}
]
print("storing fv features")
self.store.write_to_online_store(
feature_view_name="driver_hourly_stats",
df=fv_entity_rows_to_write,
)
print("reading fv features")
online_python_response = self.store.get_online_features(
entity_rows=fv_entity_rows_to_read,
features=[
"driver_hourly_stats:conv_rate",
"driver_hourly_stats:acc_rate",
"driver_hourly_stats:avg_daily_trips",
],
).to_dict()
print(online_python_response)
print("storing odfv features")
self.store.write_to_online_store(
feature_view_name="python_stored_writes_feature_view",
df=odfv_entity_rows_to_write,
)
print("reading odfv features")
online_python_response = self.store.get_online_features(
entity_rows=odfv_entity_rows_to_read,
features=[
"python_stored_writes_feature_view:conv_rate_plus_acc",
"python_stored_writes_feature_view:current_datetime",
"python_stored_writes_feature_view:counter",
"python_stored_writes_feature_view:input_datetime",
],
).to_dict()
print(online_python_response)
assert sorted(list(online_python_response.keys())) == sorted(
[
"driver_id",
"conv_rate_plus_acc",
"counter",
"current_datetime",
"input_datetime",
python_stored_writes_feature_view.entity_columns
== self.store.get_on_demand_feature_view("python_stored_writes_feature_view").entity_columns
)

current_datetime = _utc_now()
fv_entity_rows_to_write = [
{
"driver_id": 1001,
"conv_rate": 0.25,
"acc_rate": 0.25,
"avg_daily_trips": 2,
"event_timestamp": current_datetime,
"created": current_datetime,
}
]
)
odfv_entity_rows_to_write = [
{
"driver_id": 1001,
"counter": 0,
"input_datetime": current_datetime,
}
]
fv_entity_rows_to_read = [
{
"driver_id": 1001,
}
]
# Note that here we shouldn't have to pass the request source features for reading
# because they should have already been written to the online store
odfv_entity_rows_to_read = [
{
"driver_id": 1001,
"conv_rate": 0.25,
"acc_rate": 0.25,
"counter": 0,
"input_datetime": current_datetime,
}
]
print("storing fv features")
self.store.write_to_online_store(
feature_view_name="driver_hourly_stats",
df=fv_entity_rows_to_write,
)
print("reading fv features")
online_python_response = self.store.get_online_features(
entity_rows=fv_entity_rows_to_read,
features=[
"driver_hourly_stats:conv_rate",
"driver_hourly_stats:acc_rate",
"driver_hourly_stats:avg_daily_trips",
],
).to_dict()
print(online_python_response)
print("storing odfv features")
self.store.write_to_online_store(
feature_view_name="python_stored_writes_feature_view",
df=odfv_entity_rows_to_write,
)
print("reading odfv features")
online_python_response = self.store.get_online_features(
entity_rows=odfv_entity_rows_to_read,
features=[
"python_stored_writes_feature_view:conv_rate_plus_acc",
"python_stored_writes_feature_view:current_datetime",
"python_stored_writes_feature_view:counter",
"python_stored_writes_feature_view:input_datetime",
],
).to_dict()
print(online_python_response)
assert sorted(list(online_python_response.keys())) == sorted(
[
"driver_id",
"conv_rate_plus_acc",
"counter",
"current_datetime",
"input_datetime",
]
)