Skip to content

Commit 16e68f0

Browse files
merging
Signed-off-by: Francisco Javier Arceo <farceo@redhat.com>
1 parent b345e7e commit 16e68f0

File tree

5 files changed

+136
-23
lines changed

5 files changed

+136
-23
lines changed

sdk/python/feast/infra/passthrough_provider.py

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import pyarrow as pa
66
from tqdm import tqdm
77

8-
from feast import importer
8+
from feast import OnDemandFeatureView, importer
99
from feast.batch_feature_view import BatchFeatureView
1010
from feast.data_source import DataSource
1111
from feast.entity import Entity
@@ -278,25 +278,27 @@ def ingest_df(
278278
df: pd.DataFrame,
279279
):
280280
table = pa.Table.from_pandas(df)
281+
if isinstance(feature_view, OnDemandFeatureView):
282+
# TODO: Update this to support On Demand Feature Views.
283+
pass
284+
else:
285+
# Note: A dictionary mapping of column names in this data
286+
# source to feature names in a feature table or view. Only used for feature
287+
# columns, not entity or timestamp columns.
288+
if feature_view.batch_source.field_mapping is not None:
289+
table = _run_pyarrow_field_mapping(
290+
table, feature_view.batch_source.field_mapping
291+
)
281292

282-
# TODO: Update this to support On Demand Feature Views.
283-
# Note: A dictionary mapping of column names in this data
284-
# source to feature names in a feature table or view. Only used for feature
285-
# columns, not entity or timestamp columns.
286-
if feature_view.batch_source.field_mapping is not None:
287-
table = _run_pyarrow_field_mapping(
288-
table, feature_view.batch_source.field_mapping
289-
)
290-
291-
join_keys = {
292-
entity.name: entity.dtype.to_value_type()
293-
for entity in feature_view.entity_columns
294-
}
295-
rows_to_write = _convert_arrow_to_proto(table, feature_view, join_keys)
293+
join_keys = {
294+
entity.name: entity.dtype.to_value_type()
295+
for entity in feature_view.entity_columns
296+
}
297+
rows_to_write = _convert_arrow_to_proto(table, feature_view, join_keys)
296298

297-
self.online_write_batch(
298-
self.repo_config, feature_view, rows_to_write, progress=None
299-
)
299+
self.online_write_batch(
300+
self.repo_config, feature_view, rows_to_write, progress=None
301+
)
300302

301303
def ingest_df_to_offline_store(self, feature_view: FeatureView, table: pa.Table):
302304
if feature_view.batch_source.field_mapping is not None:

sdk/python/feast/on_demand_feature_view.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import inspect
44
import warnings
55
from types import FunctionType
6-
from typing import Any, List, Optional, Union, get_type_hints
6+
from typing import Any, Optional, Union, get_type_hints, List
77

88
import dill
99
import pandas as pd

sdk/python/feast/types.py

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
from enum import Enum
1616
from typing import Dict, Union
1717

18+
import pyarrow
19+
1820
from feast.value_type import ValueType
1921

2022
PRIMITIVE_FEAST_TYPES_TO_VALUE_TYPES = {
@@ -103,7 +105,6 @@ def __hash__(self):
103105
Float64 = PrimitiveFeastType.FLOAT64
104106
UnixTimestamp = PrimitiveFeastType.UNIX_TIMESTAMP
105107

106-
107108
SUPPORTED_BASE_TYPES = [
108109
Invalid,
109110
String,
@@ -159,7 +160,6 @@ def __str__(self):
159160

160161
FeastType = Union[ComplexFeastType, PrimitiveFeastType]
161162

162-
163163
VALUE_TYPES_TO_FEAST_TYPES: Dict["ValueType", FeastType] = {
164164
ValueType.UNKNOWN: Invalid,
165165
ValueType.BYTES: Bytes,
@@ -180,6 +180,32 @@ def __str__(self):
180180
ValueType.UNIX_TIMESTAMP_LIST: Array(UnixTimestamp),
181181
}
182182

183+
FEAST_TYPES_TO_PYARROW_TYPES = {
184+
String: pyarrow.string(),
185+
Bool: pyarrow.bool_(),
186+
Int32: pyarrow.int32(),
187+
Int64: pyarrow.int64(),
188+
Float32: pyarrow.float32(),
189+
Float64: pyarrow.float64(),
190+
UnixTimestamp: pyarrow.timestamp(),
191+
}
192+
193+
194+
def from_feast_to_pyarrow_type(feast_type: FeastType) -> pyarrow.DataType:
195+
"""
196+
Converts a Feast type to a PyArrow type.
197+
198+
Args:
199+
feast_type: The Feast type to be converted.
200+
201+
Raises:
202+
ValueError: The conversion could not be performed.
203+
"""
204+
if feast_type in FEAST_TYPES_TO_PYARROW_TYPES:
205+
return FEAST_TYPES_TO_PYARROW_TYPES[feast_type]
206+
207+
raise ValueError(f"Could not convert Feast type {feast_type} to PyArrow type.")
208+
183209

184210
def from_value_type(
185211
value_type: ValueType,

sdk/python/feast/utils.py

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
from feast.protos.feast.types.Value_pb2 import RepeatedValue as RepeatedValueProto
4444
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
4545
from feast.type_map import python_values_to_proto_values
46+
from feast.types import from_feast_to_pyarrow_type
4647
from feast.value_type import ValueType
4748
from feast.version import get_version
4849

@@ -230,6 +231,78 @@ def _convert_arrow_to_proto(
230231
table: Union[pyarrow.Table, pyarrow.RecordBatch],
231232
feature_view: "FeatureView",
232233
join_keys: Dict[str, ValueType],
234+
) -> List[Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]]:
235+
if isinstance(feature_view, OnDemandFeatureView):
236+
return _convert_arrow_odfv_to_proto(table, feature_view, join_keys)
237+
else:
238+
return _convert_arrow_fv_to_proto(table, feature_view, join_keys)
239+
240+
241+
def _convert_arrow_fv_to_proto(
242+
table: Union[pyarrow.Table, pyarrow.RecordBatch],
243+
feature_view: "FeatureView",
244+
join_keys: Dict[str, ValueType],
245+
) -> List[Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]]:
246+
# Avoid ChunkedArrays which guarantees `zero_copy_only` available.
247+
if isinstance(table, pyarrow.Table):
248+
table = table.to_batches()[0]
249+
250+
columns = [
251+
(field.name, field.dtype.to_value_type()) for field in feature_view.features
252+
] + list(join_keys.items())
253+
254+
proto_values_by_column = {
255+
column: python_values_to_proto_values(
256+
table.column(column).to_numpy(zero_copy_only=False), value_type
257+
)
258+
for column, value_type in columns
259+
}
260+
261+
entity_keys = [
262+
EntityKeyProto(
263+
join_keys=join_keys,
264+
entity_values=[proto_values_by_column[k][idx] for k in join_keys],
265+
)
266+
for idx in range(table.num_rows)
267+
]
268+
269+
# Serialize the features per row
270+
feature_dict = {
271+
feature.name: proto_values_by_column[feature.name]
272+
for feature in feature_view.features
273+
}
274+
features = [dict(zip(feature_dict, vars)) for vars in zip(*feature_dict.values())]
275+
276+
# Convert event_timestamps
277+
event_timestamps = [
278+
_coerce_datetime(val)
279+
for val in pd.to_datetime(
280+
table.column(feature_view.batch_source.timestamp_field).to_numpy(
281+
zero_copy_only=False
282+
)
283+
)
284+
]
285+
286+
# Convert created_timestamps if they exist
287+
if feature_view.batch_source.created_timestamp_column:
288+
created_timestamps = [
289+
_coerce_datetime(val)
290+
for val in pd.to_datetime(
291+
table.column(
292+
feature_view.batch_source.created_timestamp_column
293+
).to_numpy(zero_copy_only=False)
294+
)
295+
]
296+
else:
297+
created_timestamps = [None] * table.num_rows
298+
299+
return list(zip(entity_keys, features, event_timestamps, created_timestamps))
300+
301+
302+
def _convert_arrow_odfv_to_proto(
303+
table: Union[pyarrow.Table, pyarrow.RecordBatch],
304+
feature_view: "FeatureView",
305+
join_keys: Dict[str, ValueType],
233306
) -> List[Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]]:
234307
# Avoid ChunkedArrays which guarantees `zero_copy_only` available.
235308
if isinstance(table, pyarrow.Table):
@@ -244,7 +317,21 @@ def _convert_arrow_to_proto(
244317
table.column(column).to_numpy(zero_copy_only=False), value_type
245318
)
246319
for column, value_type in columns
320+
if column in table.column_names
247321
}
322+
# Adding On Demand Features
323+
for feature in feature_view.features:
324+
if feature.name in [c[0] for c in columns]:
325+
# initializing the column as null
326+
proto_values_by_column[feature.name] = python_values_to_proto_values(
327+
table.append_column(
328+
feature.name,
329+
pyarrow.array(
330+
[None] * table.shape[0],
331+
type=from_feast_to_pyarrow_type(feature.dtype),
332+
),
333+
),
334+
)
248335

249336
entity_keys = [
250337
EntityKeyProto(

sdk/python/tests/unit/test_on_demand_python_transformation.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -311,8 +311,6 @@ def test_stored_writes(self):
311311
"current_datetime",
312312
]
313313
)
314-
print(online_python_response)
315-
# Now this is where we need to test the stored writes, this should return the same output as the previous
316314

317315

318316
class TestOnDemandPythonTransformationAllDataTypes(unittest.TestCase):

0 commit comments

Comments
 (0)