Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
d03a895
merged changes
franciscojavierarceo Sep 24, 2024
e0f42c6
saving progress
franciscojavierarceo Aug 17, 2024
856cea2
merged changes to odfv
franciscojavierarceo Sep 24, 2024
f6a7133
linted
franciscojavierarceo Aug 18, 2024
fe387fc
adding the test needed to show the expected behavior
franciscojavierarceo Aug 18, 2024
646a124
updated test case
franciscojavierarceo Aug 21, 2024
b345e7e
saving progress
franciscojavierarceo Aug 21, 2024
16e68f0
merging
franciscojavierarceo Sep 24, 2024
24c4ae2
merged
franciscojavierarceo Sep 24, 2024
3d21881
merged
franciscojavierarceo Sep 24, 2024
7afaa84
merging
franciscojavierarceo Sep 24, 2024
436478a
adding the entity keys for now to do retrieval
franciscojavierarceo Aug 29, 2024
c78aead
adding entity to odfv
franciscojavierarceo Aug 29, 2024
fbebc63
checking in progress...getting closer
franciscojavierarceo Aug 29, 2024
5b3e2f5
may have to revert some of this...looks like the challenge is getting…
franciscojavierarceo Aug 31, 2024
7879e21
moving things around to make it easier to debug
franciscojavierarceo Sep 1, 2024
72caa15
debugging
franciscojavierarceo Sep 1, 2024
6d23889
merged
franciscojavierarceo Sep 24, 2024
b76bf4e
merging
franciscojavierarceo Sep 24, 2024
3cf0369
Rebasing and merging changes from other PR
franciscojavierarceo Sep 6, 2024
3a6dfc4
Merging changes continued
franciscojavierarceo Sep 7, 2024
1c37e54
update the _make_inference to include odfvs with writes in the update…
franciscojavierarceo Sep 7, 2024
2f9546f
have the table being written now...the create table happens in the Sq…
franciscojavierarceo Sep 8, 2024
c46e157
checking in progress
franciscojavierarceo Sep 9, 2024
580b77f
adding logs
franciscojavierarceo Sep 10, 2024
815a352
updating permissions
franciscojavierarceo Sep 10, 2024
2eda92c
going to error out on purpose
franciscojavierarceo Sep 10, 2024
e326e9b
adding unit test and merging changes
franciscojavierarceo Sep 18, 2024
a54b5f8
almost got everything working and type validation behaving
franciscojavierarceo Sep 18, 2024
029c9cf
cleaned up and have tests behaving
franciscojavierarceo Sep 18, 2024
e2c6b35
adding print
franciscojavierarceo Sep 21, 2024
adf147f
removing print
franciscojavierarceo Sep 21, 2024
d34db1d
checking in progress
franciscojavierarceo Sep 23, 2024
731bacb
updating test
franciscojavierarceo Sep 25, 2024
8479296
adding test
franciscojavierarceo Sep 25, 2024
3068c3b
linted and updated
franciscojavierarceo Sep 25, 2024
238dc29
removed print
franciscojavierarceo Sep 25, 2024
06315b9
updated tests to test actual behavior
franciscojavierarceo Sep 25, 2024
05efa37
checking in progress
franciscojavierarceo Sep 28, 2024
8c559c8
changing typo
franciscojavierarceo Sep 28, 2024
8b8aa16
updating test
franciscojavierarceo Sep 28, 2024
4562a23
testing changes
franciscojavierarceo Sep 28, 2024
14f837d
checking to see if thing still working
franciscojavierarceo Sep 29, 2024
082a860
removed print
franciscojavierarceo Sep 29, 2024
1d52d29
undo change for odfv file
franciscojavierarceo Sep 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
merging
Signed-off-by: Francisco Javier Arceo <farceo@redhat.com>
  • Loading branch information
franciscojavierarceo committed Sep 29, 2024
commit 16e68f096806c5e616689c0b6719f3bd58c8b497
38 changes: 20 additions & 18 deletions sdk/python/feast/infra/passthrough_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import pyarrow as pa
from tqdm import tqdm

from feast import importer
from feast import OnDemandFeatureView, importer
from feast.batch_feature_view import BatchFeatureView
from feast.data_source import DataSource
from feast.entity import Entity
Expand Down Expand Up @@ -278,25 +278,27 @@ def ingest_df(
df: pd.DataFrame,
):
table = pa.Table.from_pandas(df)
if isinstance(feature_view, OnDemandFeatureView):
# TODO: Update this to support On Demand Feature Views.
pass
else:
# Note: A dictionary mapping of column names in this data
# source to feature names in a feature table or view. Only used for feature
# columns, not entity or timestamp columns.
if feature_view.batch_source.field_mapping is not None:
table = _run_pyarrow_field_mapping(
table, feature_view.batch_source.field_mapping
)

# TODO: Update this to support On Demand Feature Views.
# Note: A dictionary mapping of column names in this data
# source to feature names in a feature table or view. Only used for feature
# columns, not entity or timestamp columns.
if feature_view.batch_source.field_mapping is not None:
table = _run_pyarrow_field_mapping(
table, feature_view.batch_source.field_mapping
)

join_keys = {
entity.name: entity.dtype.to_value_type()
for entity in feature_view.entity_columns
}
rows_to_write = _convert_arrow_to_proto(table, feature_view, join_keys)
join_keys = {
entity.name: entity.dtype.to_value_type()
for entity in feature_view.entity_columns
}
rows_to_write = _convert_arrow_to_proto(table, feature_view, join_keys)

self.online_write_batch(
self.repo_config, feature_view, rows_to_write, progress=None
)
self.online_write_batch(
self.repo_config, feature_view, rows_to_write, progress=None
)

def ingest_df_to_offline_store(self, feature_view: FeatureView, table: pa.Table):
if feature_view.batch_source.field_mapping is not None:
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/feast/on_demand_feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import inspect
import warnings
from types import FunctionType
from typing import Any, List, Optional, Union, get_type_hints
from typing import Any, Optional, Union, get_type_hints, List

import dill
import pandas as pd
Expand Down
30 changes: 28 additions & 2 deletions sdk/python/feast/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
from enum import Enum
from typing import Dict, Union

import pyarrow

from feast.value_type import ValueType

PRIMITIVE_FEAST_TYPES_TO_VALUE_TYPES = {
Expand Down Expand Up @@ -103,7 +105,6 @@ def __hash__(self):
Float64 = PrimitiveFeastType.FLOAT64
UnixTimestamp = PrimitiveFeastType.UNIX_TIMESTAMP


SUPPORTED_BASE_TYPES = [
Invalid,
String,
Expand Down Expand Up @@ -159,7 +160,6 @@ def __str__(self):

FeastType = Union[ComplexFeastType, PrimitiveFeastType]


VALUE_TYPES_TO_FEAST_TYPES: Dict["ValueType", FeastType] = {
ValueType.UNKNOWN: Invalid,
ValueType.BYTES: Bytes,
Expand All @@ -180,6 +180,32 @@ def __str__(self):
ValueType.UNIX_TIMESTAMP_LIST: Array(UnixTimestamp),
}

FEAST_TYPES_TO_PYARROW_TYPES = {
String: pyarrow.string(),
Bool: pyarrow.bool_(),
Int32: pyarrow.int32(),
Int64: pyarrow.int64(),
Float32: pyarrow.float32(),
Float64: pyarrow.float64(),
UnixTimestamp: pyarrow.timestamp(),
}


def from_feast_to_pyarrow_type(feast_type: FeastType) -> pyarrow.DataType:
"""
Converts a Feast type to a PyArrow type.

Args:
feast_type: The Feast type to be converted.

Raises:
ValueError: The conversion could not be performed.
"""
if feast_type in FEAST_TYPES_TO_PYARROW_TYPES:
return FEAST_TYPES_TO_PYARROW_TYPES[feast_type]

raise ValueError(f"Could not convert Feast type {feast_type} to PyArrow type.")


def from_value_type(
value_type: ValueType,
Expand Down
87 changes: 87 additions & 0 deletions sdk/python/feast/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
from feast.protos.feast.types.Value_pb2 import RepeatedValue as RepeatedValueProto
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from feast.type_map import python_values_to_proto_values
from feast.types import from_feast_to_pyarrow_type
from feast.value_type import ValueType
from feast.version import get_version

Expand Down Expand Up @@ -230,6 +231,78 @@ def _convert_arrow_to_proto(
table: Union[pyarrow.Table, pyarrow.RecordBatch],
feature_view: "FeatureView",
join_keys: Dict[str, ValueType],
) -> List[Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]]:
if isinstance(feature_view, OnDemandFeatureView):
return _convert_arrow_odfv_to_proto(table, feature_view, join_keys)
else:
return _convert_arrow_fv_to_proto(table, feature_view, join_keys)


def _convert_arrow_fv_to_proto(
table: Union[pyarrow.Table, pyarrow.RecordBatch],
feature_view: "FeatureView",
join_keys: Dict[str, ValueType],
) -> List[Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]]:
# Avoid ChunkedArrays which guarantees `zero_copy_only` available.
if isinstance(table, pyarrow.Table):
table = table.to_batches()[0]

columns = [
(field.name, field.dtype.to_value_type()) for field in feature_view.features
] + list(join_keys.items())

proto_values_by_column = {
column: python_values_to_proto_values(
table.column(column).to_numpy(zero_copy_only=False), value_type
)
for column, value_type in columns
}

entity_keys = [
EntityKeyProto(
join_keys=join_keys,
entity_values=[proto_values_by_column[k][idx] for k in join_keys],
)
for idx in range(table.num_rows)
]

# Serialize the features per row
feature_dict = {
feature.name: proto_values_by_column[feature.name]
for feature in feature_view.features
}
features = [dict(zip(feature_dict, vars)) for vars in zip(*feature_dict.values())]

# Convert event_timestamps
event_timestamps = [
_coerce_datetime(val)
for val in pd.to_datetime(
table.column(feature_view.batch_source.timestamp_field).to_numpy(
zero_copy_only=False
)
)
]

# Convert created_timestamps if they exist
if feature_view.batch_source.created_timestamp_column:
created_timestamps = [
_coerce_datetime(val)
for val in pd.to_datetime(
table.column(
feature_view.batch_source.created_timestamp_column
).to_numpy(zero_copy_only=False)
)
]
else:
created_timestamps = [None] * table.num_rows

return list(zip(entity_keys, features, event_timestamps, created_timestamps))


def _convert_arrow_odfv_to_proto(
table: Union[pyarrow.Table, pyarrow.RecordBatch],
feature_view: "FeatureView",
join_keys: Dict[str, ValueType],
) -> List[Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]]:
# Avoid ChunkedArrays which guarantees `zero_copy_only` available.
if isinstance(table, pyarrow.Table):
Expand All @@ -244,7 +317,21 @@ def _convert_arrow_to_proto(
table.column(column).to_numpy(zero_copy_only=False), value_type
)
for column, value_type in columns
if column in table.column_names
}
# Adding On Demand Features
for feature in feature_view.features:
if feature.name in [c[0] for c in columns]:
# initializing the column as null
proto_values_by_column[feature.name] = python_values_to_proto_values(
table.append_column(
feature.name,
pyarrow.array(
[None] * table.shape[0],
type=from_feast_to_pyarrow_type(feature.dtype),
),
),
)

entity_keys = [
EntityKeyProto(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,8 +311,6 @@ def test_stored_writes(self):
"current_datetime",
]
)
print(online_python_response)
# Now this is where we need to test the stored writes, this should return the same output as the previous


class TestOnDemandPythonTransformationAllDataTypes(unittest.TestCase):
Expand Down