Skip to content

Commit 754b0e8

Browse files
testing changes
Signed-off-by: Francisco Javier Arceo <farceo@redhat.com>
1 parent 271f814 commit 754b0e8

File tree

5 files changed

+61
-67
lines changed

5 files changed

+61
-67
lines changed

sdk/python/feast/feature_store.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -889,12 +889,14 @@ def apply(
889889
if isinstance(fv, FeatureView):
890890
data_sources_set_to_update.add(fv.batch_source)
891891
if isinstance(fv, StreamFeatureView):
892-
data_sources_set_to_update.add(fv.stream_source)
892+
if fv.stream_source:
893+
data_sources_set_to_update.add(fv.stream_source)
893894
if isinstance(fv, OnDemandFeatureView):
894895
for source_fvp in fv.source_feature_view_projections:
895-
data_sources_set_to_update.add(
896-
fv.source_feature_view_projections[source_fvp].batch_source
897-
)
896+
if fv.source_feature_view_projections[source_fvp].batch_source:
897+
data_sources_set_to_update.add(
898+
fv.source_feature_view_projections[source_fvp].batch_source
899+
)
898900
else:
899901
pass
900902

@@ -909,7 +911,9 @@ def apply(
909911

910912
# Validate all feature views and make inferences.
911913
self._validate_all_feature_views(
912-
views_to_update, odfvs_to_update, sfvs_to_update,
914+
views_to_update,
915+
odfvs_to_update,
916+
sfvs_to_update,
913917
)
914918
self._make_inferences(
915919
data_sources_to_update,

sdk/python/feast/inference.py

Lines changed: 31 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -296,41 +296,44 @@ def _infer_on_demand_features_and_entities(
296296
table_column_names_and_types = batch_source.get_table_column_names_and_types(
297297
config
298298
)
299-
for col_name, col_datatype in table_column_names_and_types:
300-
if col_name in columns_to_exclude:
301-
continue
302-
elif col_name in join_keys:
303-
field = Field(
304-
name=col_name,
305-
dtype=from_value_type(
306-
batch_source.source_datatype_to_feast_value_type()(col_datatype)
307-
),
308-
)
309-
if field.name not in [
310-
entity_column.name
311-
for entity_column in entity_columns
312-
if hasattr(entity_column, "name")
313-
]:
314-
entity_columns.append(field)
315-
elif not re.match(
316-
"^__|__$", col_name
317-
): # double underscores often signal an internal-use column
318-
if run_inference_for_features:
319-
feature_name = (
320-
batch_field_mapping[col_name]
321-
if col_name in batch_field_mapping
322-
else col_name
323-
)
299+
if batch_field_mapping:
300+
for col_name, col_datatype in table_column_names_and_types:
301+
if col_name in columns_to_exclude:
302+
continue
303+
elif col_name in join_keys:
324304
field = Field(
325-
name=feature_name,
305+
name=col_name,
326306
dtype=from_value_type(
327307
batch_source.source_datatype_to_feast_value_type()(
328308
col_datatype
329309
)
330310
),
331311
)
332312
if field.name not in [
333-
feature.name for feature in source_feature_view.features
313+
entity_column.name
314+
for entity_column in entity_columns
315+
if hasattr(entity_column, "name")
334316
]:
335-
source_feature_view.features.append(field)
317+
entity_columns.append(field)
318+
elif not re.match(
319+
"^__|__$", col_name
320+
): # double underscores often signal an internal-use column
321+
if run_inference_for_features:
322+
feature_name = (
323+
batch_field_mapping[col_name]
324+
if col_name in batch_field_mapping
325+
else col_name
326+
)
327+
field = Field(
328+
name=feature_name,
329+
dtype=from_value_type(
330+
batch_source.source_datatype_to_feast_value_type()(
331+
col_datatype
332+
)
333+
),
334+
)
335+
if field.name not in [
336+
feature.name for feature in source_feature_view.features
337+
]:
338+
source_feature_view.features.append(field)
336339
fv.entity_columns = entity_columns

sdk/python/feast/infra/online_stores/sqlite.py

Lines changed: 10 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
from pathlib import Path
2222
from typing import Any, Callable, Dict, List, Literal, Optional, Sequence, Tuple, Union
2323

24-
import pandas as pd
2524
from google.protobuf.internal.containers import RepeatedScalarFieldContainer
2625
from pydantic import StrictStr
2726

@@ -171,40 +170,18 @@ def online_write_batch(
171170
),
172171
)
173172

174-
try:
175-
conn.execute(
176-
f"""INSERT OR IGNORE INTO {table_name}
173+
conn.execute(
174+
f"""INSERT OR IGNORE INTO {table_name}
177175
(entity_key, feature_name, value, event_ts, created_ts)
178176
VALUES (?, ?, ?, ?, ?)""",
179-
(
180-
entity_key_bin,
181-
feature_name,
182-
val.SerializeToString(),
183-
timestamp,
184-
created_ts,
185-
),
186-
)
187-
except Exception as e:
188-
# print(
189-
# f"error writing online batch for {table_name} - {feature_name} = {val}\n {e}"
190-
# )
191-
print(
192-
f'querying all records for table: {conn.execute(f"select * from {table_name}").fetchall()}'
193-
)
194-
def get_table_data(conn):
195-
x = conn.execute(f"select * from sqlite_master").fetchall()
196-
y = conn.execute(f"select * from sqlite_master")
197-
names = list(map(lambda x: x[0], y.description))
198-
return pd.DataFrame(x, columns=names)
199-
200-
df = get_table_data(conn)
201-
tmp = [ conn.execute(f"select count(*) from {table_name}").fetchall() for table_name in df['name'].values if table_name not in ['sqlite_autoindex_test_on_demand_python_transformation_driver_hourly_stats_1', 'test_on_demand_python_transformation_driver_hourly_stats_ek', 'sqlite_autoindex_test_on_demand_python_transformation_python_stored_writes_feature_view_1', 'test_on_demand_python_transformation_python_stored_writes_feature_view_ek']]
202-
print(tmp)
203-
204-
r = conn.execute("""
205-
SELECT * FROM sqlite_master WHERE type='table' and name = 'test_on_demand_python_transformation_python_stored_writes_feature_view';
206-
""")
207-
print(f"table exists: {r.fetchall()}")
177+
(
178+
entity_key_bin,
179+
feature_name,
180+
val.SerializeToString(),
181+
timestamp,
182+
created_ts,
183+
),
184+
)
208185
if progress:
209186
progress(1)
210187

@@ -271,7 +248,6 @@ def update(
271248
project = config.project
272249

273250
for table in tables_to_keep:
274-
print(f"updating {_table_id(project, table)}")
275251
conn.execute(
276252
f"CREATE TABLE IF NOT EXISTS {_table_id(project, table)} (entity_key BLOB, feature_name TEXT, value BLOB, vector_value BLOB, event_ts timestamp, created_ts timestamp, PRIMARY KEY(entity_key, feature_name))"
277253
)

sdk/python/feast/on_demand_feature_view.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,7 @@ def to_proto(self) -> OnDemandFeatureViewProto:
328328
owner=self.owner,
329329
write_to_online_store=self.write_to_online_store,
330330
)
331+
print("*" * 40, "\n", spec)
331332

332333
return OnDemandFeatureViewProto(spec=spec, meta=meta)
333334

sdk/python/tests/unit/test_on_demand_python_transformation.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -596,6 +596,9 @@ def python_stored_writes_feature_view(
596596
print("running odfv transform")
597597
return output
598598

599+
assert python_stored_writes_feature_view.entities == [driver.name]
600+
assert python_stored_writes_feature_view.entity_columns == []
601+
599602
self.store.apply(
600603
[
601604
driver,
@@ -605,9 +608,16 @@ def python_stored_writes_feature_view(
605608
]
606609
)
607610
fv_applied = self.store.get_feature_view("driver_hourly_stats")
611+
odfv_applied = self.store.get_on_demand_feature_view(
612+
"python_stored_writes_feature_view"
613+
)
614+
608615
assert fv_applied.entities == [driver.name]
616+
assert odfv_applied.entites == [driver.name]
617+
609618
# Note here that after apply() is called, the entity_columns are populated with the join_key
610619
assert fv_applied.entity_columns[0].name == driver.join_key
620+
assert odfv_applied.entity_columns[0].name == driver.join_key
611621

612622
self.store.write_to_online_store(
613623
feature_view_name="driver_hourly_stats", df=driver_df

0 commit comments

Comments
 (0)