Skip to content

Commit cc00315

Browse files
authored
Implement materialize method (feast-dev#1379)
* WIP Ingest into Firestore Signed-off-by: Jacob Klegar <jacob@tecton.ai> * Full materialize function Signed-off-by: Jacob Klegar <jacob@tecton.ai> * Rebase Signed-off-by: Jacob Klegar <jacob@tecton.ai> * Add basic ingestion integration test Signed-off-by: Jacob Klegar <jacob@tecton.ai> * Update created_ts to use column or null instead of current ts Signed-off-by: Jacob Klegar <jacob@tecton.ai> * Update feast types import Signed-off-by: Jacob Klegar <jacob@tecton.ai> * lint Signed-off-by: Jacob Klegar <jacob@tecton.ai> * Address comments Signed-off-by: Jacob Klegar <jacob@tecton.ai> * Use existing type map function Signed-off-by: Jacob Klegar <jacob@tecton.ai> * Address comments Signed-off-by: Jacob Klegar <jacob@tecton.ai> * Address comments Signed-off-by: Jacob Klegar <jacob@tecton.ai> * Add comment Signed-off-by: Jacob Klegar <jacob@tecton.ai>
1 parent 3ae52ff commit cc00315

File tree

9 files changed

+357
-101
lines changed

9 files changed

+357
-101
lines changed

sdk/python/feast/feature_store.py

Lines changed: 193 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,22 +11,32 @@
1111
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
14+
from datetime import datetime
1415
from pathlib import Path
15-
from typing import List, Optional, Union
16+
from typing import Dict, List, Optional, Tuple, Union
1617

1718
import pandas as pd
19+
import pyarrow
1820

21+
from feast.data_source import FileSource
1922
from feast.entity import Entity
2023
from feast.feature_view import FeatureView
2124
from feast.infra.provider import Provider, get_provider
22-
from feast.offline_store import RetrievalJob, get_offline_store_for_retrieval
25+
from feast.offline_store import (
26+
RetrievalJob,
27+
get_offline_store,
28+
get_offline_store_for_retrieval,
29+
)
30+
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
31+
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
2332
from feast.registry import Registry
2433
from feast.repo_config import (
2534
LocalOnlineStoreConfig,
2635
OnlineStoreConfig,
2736
RepoConfig,
2837
load_repo_config,
2938
)
39+
from feast.type_map import python_value_to_proto_value
3040

3141

3242
class FeatureStore:
@@ -153,6 +163,88 @@ def get_historical_features(
153163
)
154164
return job
155165

166+
def materialize(
167+
self,
168+
feature_views: Optional[List[str]],
169+
start_date: datetime,
170+
end_date: datetime,
171+
) -> None:
172+
"""
173+
Materialize data from the offline store into the online store.
174+
175+
This method loads feature data in the specified interval from either
176+
the specified feature views, or all feature views if none are specified,
177+
into the online store where it is available for online serving.
178+
179+
Args:
180+
feature_views (List[str]): Optional list of feature view names. If selected, will only run
181+
materialization for the specified feature views.
182+
start_date (datetime): Start date for time range of data to materialize into the online store
183+
end_date (datetime): End date for time range of data to materialize into the online store
184+
185+
Examples:
186+
Materialize all features into the online store over the interval
187+
from 3 hours ago to 10 minutes ago.
188+
>>> from datetime import datetime, timedelta
189+
>>> from feast.feature_store import FeatureStore
190+
>>>
191+
>>> fs = FeatureStore(config=RepoConfig(provider="gcp"))
192+
>>> fs.materialize(
193+
>>> start_date=datetime.utcnow() - timedelta(hours=3),
194+
>>> end_date=datetime.utcnow() - timedelta(minutes=10)
195+
>>> )
196+
"""
197+
feature_views_to_materialize = []
198+
registry = self._get_registry()
199+
if feature_views is None:
200+
feature_views_to_materialize = registry.list_feature_views(
201+
self.config.project
202+
)
203+
else:
204+
for name in feature_views:
205+
feature_view = registry.get_feature_view(name, self.config.project)
206+
feature_views_to_materialize.append(feature_view)
207+
208+
# TODO paging large loads
209+
for feature_view in feature_views_to_materialize:
210+
if isinstance(feature_view.input, FileSource):
211+
raise NotImplementedError(
212+
"This function is not yet implemented for File data sources"
213+
)
214+
if not feature_view.input.table_ref:
215+
raise NotImplementedError(
216+
f"This function is only implemented for FeatureViews with a table_ref; {feature_view.name} does not have one."
217+
)
218+
(
219+
entity_names,
220+
feature_names,
221+
event_timestamp_column,
222+
created_timestamp_column,
223+
) = _run_reverse_field_mapping(feature_view)
224+
225+
offline_store = get_offline_store(self.config)
226+
table = offline_store.pull_latest_from_table(
227+
feature_view.input,
228+
entity_names,
229+
feature_names,
230+
event_timestamp_column,
231+
created_timestamp_column,
232+
start_date,
233+
end_date,
234+
)
235+
236+
if feature_view.input.field_mapping is not None:
237+
table = _run_forward_field_mapping(
238+
table, feature_view.input.field_mapping
239+
)
240+
241+
rows_to_write = _convert_arrow_to_proto(table, feature_view)
242+
243+
provider = self._get_provider()
244+
provider.online_write_batch(
245+
self.config.project, feature_view, rows_to_write
246+
)
247+
156248

157249
def _get_requested_feature_views(
158250
feature_refs: List[str], all_feature_views: List[FeatureView]
@@ -176,3 +268,102 @@ def _get_requested_feature_views(
176268
feature_views_list.append(view)
177269

178270
return feature_views_list
271+
272+
273+
def _run_reverse_field_mapping(
274+
feature_view: FeatureView,
275+
) -> Tuple[List[str], List[str], str, Optional[str]]:
276+
"""
277+
If a field mapping exists, run it in reverse on the entity names,
278+
feature names, event timestamp column, and created timestamp column
279+
to get the names of the relevant columns in the BigQuery table.
280+
281+
Args:
282+
feature_view: FeatureView object containing the field mapping
283+
as well as the names to reverse-map.
284+
Returns:
285+
Tuple containing the list of reverse-mapped entity names,
286+
reverse-mapped feature names, reverse-mapped event timestamp column,
287+
and reverse-mapped created timestamp column that will be passed into
288+
the query to the offline store.
289+
"""
290+
# if we have mapped fields, use the original field names in the call to the offline store
291+
event_timestamp_column = feature_view.input.event_timestamp_column
292+
entity_names = [entity for entity in feature_view.entities]
293+
feature_names = [feature.name for feature in feature_view.features]
294+
created_timestamp_column = feature_view.input.created_timestamp_column
295+
if feature_view.input.field_mapping is not None:
296+
reverse_field_mapping = {
297+
v: k for k, v in feature_view.input.field_mapping.items()
298+
}
299+
event_timestamp_column = (
300+
reverse_field_mapping[event_timestamp_column]
301+
if event_timestamp_column in reverse_field_mapping.keys()
302+
else event_timestamp_column
303+
)
304+
created_timestamp_column = (
305+
reverse_field_mapping[created_timestamp_column]
306+
if created_timestamp_column
307+
and created_timestamp_column in reverse_field_mapping.keys()
308+
else created_timestamp_column
309+
)
310+
entity_names = [
311+
reverse_field_mapping[col] if col in reverse_field_mapping.keys() else col
312+
for col in entity_names
313+
]
314+
feature_names = [
315+
reverse_field_mapping[col] if col in reverse_field_mapping.keys() else col
316+
for col in feature_names
317+
]
318+
return (
319+
entity_names,
320+
feature_names,
321+
event_timestamp_column,
322+
created_timestamp_column,
323+
)
324+
325+
326+
def _run_forward_field_mapping(
327+
table: pyarrow.Table, field_mapping: Dict[str, str],
328+
) -> pyarrow.Table:
329+
# run field mapping in the forward direction
330+
cols = table.column_names
331+
mapped_cols = [
332+
field_mapping[col] if col in field_mapping.keys() else col for col in cols
333+
]
334+
table = table.rename_columns(mapped_cols)
335+
return table
336+
337+
338+
def _convert_arrow_to_proto(
339+
table: pyarrow.Table, feature_view: FeatureView
340+
) -> List[Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]]:
341+
rows_to_write = []
342+
for row in zip(*table.to_pydict().values()):
343+
entity_key = EntityKeyProto()
344+
for entity_name in feature_view.entities:
345+
entity_key.entity_names.append(entity_name)
346+
idx = table.column_names.index(entity_name)
347+
value = python_value_to_proto_value(row[idx])
348+
entity_key.entity_values.append(value)
349+
feature_dict = {}
350+
for feature in feature_view.features:
351+
idx = table.column_names.index(feature.name)
352+
value = python_value_to_proto_value(row[idx])
353+
feature_dict[feature.name] = value
354+
event_timestamp_idx = table.column_names.index(
355+
feature_view.input.event_timestamp_column
356+
)
357+
event_timestamp = row[event_timestamp_idx]
358+
if feature_view.input.created_timestamp_column is not None:
359+
created_timestamp_idx = table.column_names.index(
360+
feature_view.input.created_timestamp_column
361+
)
362+
created_timestamp = row[created_timestamp_idx]
363+
else:
364+
created_timestamp = None
365+
366+
rows_to_write.append(
367+
(entity_key, feature_dict, event_timestamp, created_timestamp)
368+
)
369+
return rows_to_write

sdk/python/feast/infra/gcp.py

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -106,14 +106,15 @@ def online_write_batch(
106106
self,
107107
project: str,
108108
table: Union[FeatureTable, FeatureView],
109-
data: List[Tuple[EntityKeyProto, Dict[str, ValueProto], datetime]],
110-
created_ts: datetime,
109+
data: List[
110+
Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]
111+
],
111112
) -> None:
112113
from google.cloud import datastore
113114

114115
client = self._initialize_client()
115116

116-
for entity_key, features, timestamp in data:
117+
for entity_key, features, timestamp, created_ts in data:
117118
document_id = compute_datastore_entity_id(entity_key)
118119

119120
key = client.key(
@@ -125,9 +126,12 @@ def online_write_batch(
125126
if entity["event_ts"] > _make_tzaware(timestamp):
126127
# Do not overwrite feature values computed from fresher data
127128
continue
128-
elif entity["event_ts"] == _make_tzaware(timestamp) and entity[
129-
"created_ts"
130-
] > _make_tzaware(created_ts):
129+
elif (
130+
entity["event_ts"] == _make_tzaware(timestamp)
131+
and created_ts is not None
132+
and entity["created_ts"] is not None
133+
and entity["created_ts"] > _make_tzaware(created_ts)
134+
):
131135
# Do not overwrite feature values computed from the same data, but
132136
# computed later than this one
133137
continue
@@ -139,7 +143,11 @@ def online_write_batch(
139143
key=entity_key.SerializeToString(),
140144
values={k: v.SerializeToString() for k, v in features.items()},
141145
event_ts=_make_tzaware(timestamp),
142-
created_ts=_make_tzaware(created_ts),
146+
created_ts=(
147+
_make_tzaware(created_ts)
148+
if created_ts is not None
149+
else None
150+
),
143151
)
144152
)
145153
client.put(entity)

sdk/python/feast/infra/local_sqlite.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,21 +53,22 @@ def online_write_batch(
5353
self,
5454
project: str,
5555
table: Union[FeatureTable, FeatureView],
56-
data: List[Tuple[EntityKeyProto, Dict[str, ValueProto], datetime]],
57-
created_ts: datetime,
56+
data: List[
57+
Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]
58+
],
5859
) -> None:
5960
conn = self._get_conn()
6061

6162
with conn:
62-
for entity_key, values, timestamp in data:
63+
for entity_key, values, timestamp, created_ts in data:
6364
for feature_name, val in values.items():
6465
entity_key_bin = serialize_entity_key(entity_key)
6566

6667
conn.execute(
6768
f"""
6869
UPDATE {_table_id(project, table)}
6970
SET value = ?, event_ts = ?, created_ts = ?
70-
WHERE (event_ts < ? OR (event_ts = ? AND created_ts < ?))
71+
WHERE (event_ts < ? OR (event_ts = ? AND (created_ts IS NULL OR ? IS NULL OR created_ts < ?)))
7172
AND (entity_key = ? AND feature_name = ?)
7273
""",
7374
(
@@ -79,6 +80,7 @@ def online_write_batch(
7980
timestamp,
8081
timestamp,
8182
created_ts,
83+
created_ts,
8284
entity_key_bin,
8385
feature_name,
8486
),

sdk/python/feast/infra/provider.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,9 @@ def online_write_batch(
4444
self,
4545
project: str,
4646
table: Union[FeatureTable, FeatureView],
47-
data: List[Tuple[EntityKeyProto, Dict[str, ValueProto], datetime]],
48-
created_ts: datetime,
47+
data: List[
48+
Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]
49+
],
4950
) -> None:
5051
"""
5152
Write a batch of feature rows to the online store. This is a low level interface, not
@@ -56,10 +57,9 @@ def online_write_batch(
5657
Args:
5758
project: Feast project name
5859
table: Feast FeatureTable
59-
data: a list of triplets containing Feature data. Each triplet contains an Entity Key,
60-
a dict containing feature values, and event timestamp for the row.
61-
created_ts: the created timestamp (typically set to current time), same value used for
62-
all rows.
60+
data: a list of quadruplets containing Feature data. Each quadruplet contains an Entity Key,
61+
a dict containing feature values, an event timestamp for the row, and
62+
the created timestamp for the row if it exists.
6363
"""
6464
...
6565

0 commit comments

Comments
 (0)