Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
66 commits
Select commit Hold shift + click to select a range
508bad2
merged changes
franciscojavierarceo Sep 24, 2024
899f7f4
saving progress
franciscojavierarceo Aug 17, 2024
5b69c71
merged changes to odfv
franciscojavierarceo Sep 24, 2024
d107354
linted
franciscojavierarceo Aug 18, 2024
a34ec4d
adding the test needed to show the expected behavior
franciscojavierarceo Aug 18, 2024
b95d2a2
updated test case
franciscojavierarceo Aug 21, 2024
47974c2
saving progress
franciscojavierarceo Aug 21, 2024
ceb75a2
merging
franciscojavierarceo Sep 24, 2024
6688933
merged
franciscojavierarceo Sep 24, 2024
4c28acc
merged
franciscojavierarceo Sep 24, 2024
fd577dc
merging
franciscojavierarceo Sep 24, 2024
167fe6c
adding the entity keys for now to do retrieval
franciscojavierarceo Aug 29, 2024
54811d7
adding entity to odfv
franciscojavierarceo Aug 29, 2024
f0d87fc
checking in progress...getting closer
franciscojavierarceo Aug 29, 2024
b7091ca
may have to revert some of this...looks like the challenge is getting…
franciscojavierarceo Aug 31, 2024
d2a12f8
moving things around to make it easier to debug
franciscojavierarceo Sep 1, 2024
9d496ba
debugging
franciscojavierarceo Sep 1, 2024
58280aa
merged
franciscojavierarceo Sep 24, 2024
c42be75
merging
franciscojavierarceo Sep 24, 2024
fb3b315
Rebasing and merging changes from other PR
franciscojavierarceo Sep 6, 2024
82f3f8b
Merging changes continued
franciscojavierarceo Sep 7, 2024
172693d
update the _make_inference to include odfvs with writes in the update…
franciscojavierarceo Sep 7, 2024
66c5b57
have the table being written now...the create table happens in the Sq…
franciscojavierarceo Sep 8, 2024
ea3b98a
checking in progress
franciscojavierarceo Sep 9, 2024
905912b
adding logs
franciscojavierarceo Sep 10, 2024
25d42dd
updating permissions
franciscojavierarceo Sep 10, 2024
03d6116
going to error out on purpose
franciscojavierarceo Sep 10, 2024
9b16615
adding unit test and merging changes
franciscojavierarceo Sep 18, 2024
adbaeb6
almost got everything working and type validation behaving
franciscojavierarceo Sep 18, 2024
11d2914
cleaned up and have tests behaving
franciscojavierarceo Sep 18, 2024
64375ee
adding print
franciscojavierarceo Sep 21, 2024
3e6912a
removing print
franciscojavierarceo Sep 21, 2024
5751a72
checking in progress
franciscojavierarceo Sep 23, 2024
b0208e1
updating test
franciscojavierarceo Sep 25, 2024
463d8bb
adding test
franciscojavierarceo Sep 25, 2024
2981817
linted and updated
franciscojavierarceo Sep 25, 2024
3a33368
removed print
franciscojavierarceo Sep 25, 2024
22bf637
updated tests to test actual behavior
franciscojavierarceo Sep 25, 2024
0d0d117
checking in progress
franciscojavierarceo Sep 28, 2024
5bff836
changing typo
franciscojavierarceo Sep 28, 2024
271f814
updating test
franciscojavierarceo Sep 28, 2024
754b0e8
testing changes
franciscojavierarceo Sep 28, 2024
25c7181
checking to see if thing still working
franciscojavierarceo Sep 29, 2024
1d4023f
removed print
franciscojavierarceo Sep 29, 2024
3662102
undo change for odfv file
franciscojavierarceo Sep 29, 2024
74e7ede
updated tests
franciscojavierarceo Sep 30, 2024
59940cf
okay well have the unit test working
franciscojavierarceo Oct 1, 2024
b223feb
type changes, hope i dont regret them
franciscojavierarceo Oct 1, 2024
01770e2
updated stream feature view piece
franciscojavierarceo Oct 2, 2024
7606481
updated sfv ifelse
franciscojavierarceo Oct 2, 2024
c4ebf18
removing print
franciscojavierarceo Oct 2, 2024
72add32
formatted and updated test
franciscojavierarceo Oct 2, 2024
24e0a84
resolving some linter errors
franciscojavierarceo Oct 3, 2024
b92bf32
fixed linter and formatting
franciscojavierarceo Oct 3, 2024
934c1e9
okay think it is working
franciscojavierarceo Oct 3, 2024
0ce93f2
linter
franciscojavierarceo Oct 3, 2024
bf31d59
updated type map for integration tests
franciscojavierarceo Oct 4, 2024
9aff889
Merge branch 'master' into podfv2
franciscojavierarceo Oct 5, 2024
acc7b7f
updated local feature store test
franciscojavierarceo Oct 5, 2024
12d9100
fixed local fs test
franciscojavierarceo Oct 5, 2024
3c94614
chore: Updated snowflake test to be more explicit about post apply en…
franciscojavierarceo Oct 5, 2024
f01b691
merging
franciscojavierarceo Oct 5, 2024
c26ae75
fixed test to entity_rows_to_read
franciscojavierarceo Oct 5, 2024
c6d55c7
resolved inf conflicts
franciscojavierarceo Oct 5, 2024
52b8c4d
lint
franciscojavierarceo Oct 5, 2024
ca0971a
Updated tests and lint, think I have everything working
franciscojavierarceo Oct 6, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
type changes, hope i dont regret them
Signed-off-by: Francisco Javier Arceo <farceo@redhat.com>
  • Loading branch information
franciscojavierarceo committed Oct 1, 2024
commit b223feb88dd2ef0f9f08d6095bfd5524df5276fc
4 changes: 3 additions & 1 deletion sdk/python/feast/feature_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,9 @@ def __init__(
if isinstance(feature_grouping, BaseFeatureView):
self.feature_view_projections.append(feature_grouping.projection)

def infer_features(self, fvs_to_update: Dict[str, FeatureView]):
def infer_features(
self, fvs_to_update: Dict[str, Union[FeatureView, BaseFeatureView]]
):
"""
Infers the features for the projections of this feature service, and updates this feature
service in place.
Expand Down
22 changes: 9 additions & 13 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -893,10 +893,11 @@ def apply(
data_sources_set_to_update.add(fv.stream_source)
if isinstance(fv, OnDemandFeatureView):
for source_fvp in fv.source_feature_view_projections:
if fv.source_feature_view_projections[source_fvp].batch_source:
data_sources_set_to_update.add(
fv.source_feature_view_projections[source_fvp].batch_source
)
odfv_batch_source: Optional[DataSource] = (
fv.source_feature_view_projections[source_fvp].batch_source
)
if odfv_batch_source is not None:
data_sources_set_to_update.add(odfv_batch_source)
else:
pass

Expand Down Expand Up @@ -1018,9 +1019,9 @@ def apply(
tables_to_delete: List[FeatureView] = (
views_to_delete + sfvs_to_delete if not partial else [] # type: ignore
)
tables_to_keep: List[FeatureView] = (
views_to_update + sfvs_to_update + odfvs_with_writes_to_update
) # type: ignore
tables_to_keep: List[
Union[FeatureView, StreamFeatureView, OnDemandFeatureView]
] = views_to_update + sfvs_to_update + odfvs_with_writes_to_update # type: ignore

self._get_provider().update_infra(
project=self.project,
Expand Down Expand Up @@ -1503,12 +1504,7 @@ def write_to_online_store(
raise DataFrameSerializationError

provider = self._get_provider()
if isinstance(feature_view, OnDemandFeatureView):
# TODO: add projection mapping
projection_mapping = {}
provider.ingest_df(feature_view, df, projection_mapping)
else:
provider.ingest_df(feature_view, df)
provider.ingest_df(feature_view, df)

def write_to_offline_store(
self,
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/feast/inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ def _infer_features_and_entities(
fv, join_keys, run_inference_for_features, config
)

entity_columns = []
entity_columns: List[Field] = []
columns_to_exclude = {
fv.batch_source.timestamp_field,
fv.batch_source.created_timestamp_column,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from feast.infra.offline_stores.offline_store import OfflineStore
from feast.infra.online_stores.online_store import OnlineStore
from feast.infra.registry.base_registry import BaseRegistry
from feast.on_demand_feature_view import OnDemandFeatureView
from feast.repo_config import RepoConfig
from feast.stream_feature_view import StreamFeatureView

Expand Down Expand Up @@ -89,7 +90,7 @@ def update(
Union[BatchFeatureView, StreamFeatureView, FeatureView]
],
views_to_keep: Sequence[
Union[BatchFeatureView, StreamFeatureView, FeatureView]
Union[BatchFeatureView, StreamFeatureView, FeatureView, OnDemandFeatureView]
],
entities_to_delete: Sequence[Entity],
entities_to_keep: Sequence[Entity],
Expand Down
7 changes: 5 additions & 2 deletions sdk/python/feast/infra/passthrough_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from tqdm import tqdm

from feast import OnDemandFeatureView, importer
from feast.base_feature_view import BaseFeatureView
from feast.batch_feature_view import BatchFeatureView
from feast.data_source import DataSource
from feast.entity import Entity
Expand Down Expand Up @@ -122,7 +123,7 @@ def update_infra(
self,
project: str,
tables_to_delete: Sequence[FeatureView],
tables_to_keep: Sequence[FeatureView],
tables_to_keep: Sequence[Union[FeatureView, OnDemandFeatureView]],
entities_to_delete: Sequence[Entity],
entities_to_keep: Sequence[Entity],
partial: bool,
Expand Down Expand Up @@ -274,12 +275,14 @@ def retrieve_online_documents(

def ingest_df(
self,
feature_view: FeatureView,
feature_view: Union[BaseFeatureView, FeatureView, OnDemandFeatureView],
df: pd.DataFrame,
field_mapping: Optional[Dict] = None,
):
table = pa.Table.from_pandas(df)
if isinstance(feature_view, OnDemandFeatureView):
if not field_mapping:
field_mapping = {}
table = _run_pyarrow_field_mapping(table, field_mapping)
join_keys = {
entity.name: entity.dtype.to_value_type()
Expand Down
8 changes: 6 additions & 2 deletions sdk/python/feast/infra/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@
from tqdm import tqdm

from feast import FeatureService, errors
from feast.base_feature_view import BaseFeatureView
from feast.data_source import DataSource
from feast.entity import Entity
from feast.feature_view import FeatureView
from feast.importer import import_class
from feast.infra.infra_object import Infra
from feast.infra.offline_stores.offline_store import RetrievalJob
from feast.infra.registry.base_registry import BaseRegistry
from feast.on_demand_feature_view import OnDemandFeatureView
from feast.online_response import OnlineResponse
from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
Expand Down Expand Up @@ -47,7 +49,7 @@ def update_infra(
self,
project: str,
tables_to_delete: Sequence[FeatureView],
tables_to_keep: Sequence[FeatureView],
tables_to_keep: Sequence[Union[FeatureView, OnDemandFeatureView]],
entities_to_delete: Sequence[Entity],
entities_to_keep: Sequence[Entity],
partial: bool,
Expand Down Expand Up @@ -125,15 +127,17 @@ def online_write_batch(

def ingest_df(
self,
feature_view: FeatureView,
feature_view: Union[BaseFeatureView, FeatureView, OnDemandFeatureView],
df: pd.DataFrame,
field_mapping: Optional[Dict] = None,
):
"""
Persists a dataframe to the online store.

Args:
feature_view: The feature view to which the dataframe corresponds.
df: The dataframe to be persisted.
field_mapping: A dictionary mapping dataframe column names to feature names.
"""
pass

Expand Down
3 changes: 3 additions & 0 deletions sdk/python/feast/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,9 @@ def from_feast_to_pyarrow_type(feast_type: FeastType) -> pyarrow.DataType:
Raises:
ValueError: The conversion could not be performed.
"""
assert isinstance(
feast_type, FeastType
), f"Expected FeastType, got {type(feast_type)}"
if feast_type in FEAST_TYPES_TO_PYARROW_TYPES:
return FEAST_TYPES_TO_PYARROW_TYPES[feast_type]

Expand Down
10 changes: 5 additions & 5 deletions sdk/python/feast/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,14 +228,14 @@ def _coerce_datetime(ts):

def _convert_arrow_to_proto(
table: Union[pyarrow.Table, pyarrow.RecordBatch],
feature_view: "FeatureView",
feature_view: Union["FeatureView", "OnDemandFeatureView"],
join_keys: Dict[str, ValueType],
) -> List[Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]]:
# This is a workaround for isinstance(feature_view, OnDemandFeatureView), which triggers a circular import
if getattr(feature_view, "source_request_sources", None):
return _convert_arrow_odfv_to_proto(table, feature_view, join_keys)
return _convert_arrow_odfv_to_proto(table, feature_view, join_keys) # type: ignore[arg-type]
else:
return _convert_arrow_fv_to_proto(table, feature_view, join_keys)
return _convert_arrow_fv_to_proto(table, feature_view, join_keys) # type: ignore[arg-type]


def _convert_arrow_fv_to_proto(
Expand Down Expand Up @@ -301,7 +301,7 @@ def _convert_arrow_fv_to_proto(

def _convert_arrow_odfv_to_proto(
table: Union[pyarrow.Table, pyarrow.RecordBatch],
feature_view: "FeatureView",
feature_view: "OnDemandFeatureView",
join_keys: Dict[str, ValueType],
) -> List[Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]]:
# Avoid ChunkedArrays which guarantees `zero_copy_only` available.
Expand Down Expand Up @@ -1013,7 +1013,7 @@ def _prepare_entities_to_read_from_online_store(

num_rows = _validate_entity_values(entity_proto_values)

odfv_entities = []
odfv_entities: List[Entity] = []
request_source_keys = []
for on_demand_feature_view in requested_on_demand_feature_views:
odfv_entities.append(*getattr(on_demand_feature_view, "entities", None))
Expand Down