Skip to content

Commit 04d2e3d

Browse files
authored
Add materialize_incremental method (#1407)
* Add materialize_incremental Signed-off-by: Jacob Klegar <jacob@tecton.ai> * Rebase Signed-off-by: Jacob Klegar <jacob@tecton.ai> * Address comment Signed-off-by: Jacob Klegar <jacob@tecton.ai>
1 parent f1e3072 commit 04d2e3d

File tree

4 files changed

+143
-39
lines changed

4 files changed

+143
-39
lines changed

protos/feast/core/FeatureView.proto

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,4 +71,12 @@ message FeatureViewMeta {
7171

7272
// Time where this Feature View is last updated
7373
google.protobuf.Timestamp last_updated_timestamp = 2;
74+
75+
// List of pairs (start_time, end_time) for which this feature view has been materialized.
76+
repeated MaterializationInterval materialization_intervals = 3;
77+
}
78+
79+
message MaterializationInterval {
80+
google.protobuf.Timestamp start_time = 1;
81+
google.protobuf.Timestamp end_time = 2;
7482
}

sdk/python/feast/feature_store.py

Lines changed: 82 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,55 @@ def get_historical_features(
181181
)
182182
return job
183183

184+
def materialize_incremental(
185+
self, feature_views: Optional[List[str]], end_date: datetime,
186+
) -> None:
187+
"""
188+
Materialize incremental new data from the offline store into the online store.
189+
190+
This method loads incremental new feature data up to the specified end time from either
191+
the specified feature views, or all feature views if none are specified,
192+
into the online store where it is available for online serving. The start time of
193+
the interval materialized is either the most recent end time of a prior materialization or
194+
(now - ttl) if no such prior materialization exists.
195+
196+
Args:
197+
feature_views (List[str]): Optional list of feature view names. If selected, will only run
198+
materialization for the specified feature views.
199+
end_date (datetime): End date for time range of data to materialize into the online store
200+
201+
Examples:
202+
Materialize all features into the online store up to 5 minutes ago.
203+
>>> from datetime import datetime, timedelta
204+
>>> from feast.feature_store import FeatureStore
205+
>>>
206+
>>> fs = FeatureStore(config=RepoConfig(provider="gcp"))
207+
>>> fs.materialize_incremental(
208+
>>> end_date=datetime.utcnow() - timedelta(minutes=5)
209+
>>> )
210+
"""
211+
feature_views_to_materialize = []
212+
registry = self._get_registry()
213+
if feature_views is None:
214+
feature_views_to_materialize = registry.list_feature_views(
215+
self.config.project
216+
)
217+
else:
218+
for name in feature_views:
219+
feature_view = registry.get_feature_view(name, self.config.project)
220+
feature_views_to_materialize.append(feature_view)
221+
222+
# TODO paging large loads
223+
for feature_view in feature_views_to_materialize:
224+
start_date = feature_view.most_recent_end_time
225+
if start_date is None:
226+
if feature_view.ttl is None:
227+
raise Exception(
228+
f"No start time found for feature view {feature_view.name}. materialize_incremental() requires either a ttl to be set or for materialize() to have been run at least once."
229+
)
230+
start_date = datetime.utcnow() - feature_view.ttl
231+
self._materialize_single_feature_view(feature_view, start_date, end_date)
232+
184233
def materialize(
185234
self,
186235
feature_views: Optional[List[str]],
@@ -225,39 +274,43 @@ def materialize(
225274

226275
# TODO paging large loads
227276
for feature_view in feature_views_to_materialize:
228-
if isinstance(feature_view.input, FileSource):
229-
raise NotImplementedError(
230-
"This function is not yet implemented for File data sources"
231-
)
232-
(
233-
entity_names,
234-
feature_names,
235-
event_timestamp_column,
236-
created_timestamp_column,
237-
) = _run_reverse_field_mapping(feature_view)
238-
239-
offline_store = get_offline_store(self.config)
240-
table = offline_store.pull_latest_from_table_or_query(
241-
feature_view.input,
242-
entity_names,
243-
feature_names,
244-
event_timestamp_column,
245-
created_timestamp_column,
246-
start_date,
247-
end_date,
277+
self._materialize_single_feature_view(feature_view, start_date, end_date)
278+
279+
def _materialize_single_feature_view(
280+
self, feature_view: FeatureView, start_date: datetime, end_date: datetime
281+
) -> None:
282+
if isinstance(feature_view.input, FileSource):
283+
raise NotImplementedError(
284+
"This function is not yet implemented for File data sources"
248285
)
286+
(
287+
entity_names,
288+
feature_names,
289+
event_timestamp_column,
290+
created_timestamp_column,
291+
) = _run_reverse_field_mapping(feature_view)
292+
293+
offline_store = get_offline_store(self.config)
294+
table = offline_store.pull_latest_from_table_or_query(
295+
feature_view.input,
296+
entity_names,
297+
feature_names,
298+
event_timestamp_column,
299+
created_timestamp_column,
300+
start_date,
301+
end_date,
302+
)
249303

250-
if feature_view.input.field_mapping is not None:
251-
table = _run_forward_field_mapping(
252-
table, feature_view.input.field_mapping
253-
)
304+
if feature_view.input.field_mapping is not None:
305+
table = _run_forward_field_mapping(table, feature_view.input.field_mapping)
254306

255-
rows_to_write = _convert_arrow_to_proto(table, feature_view)
307+
rows_to_write = _convert_arrow_to_proto(table, feature_view)
256308

257-
provider = self._get_provider()
258-
provider.online_write_batch(
259-
self.config.project, feature_view, rows_to_write
260-
)
309+
provider = self._get_provider()
310+
provider.online_write_batch(self.config.project, feature_view, rows_to_write)
311+
312+
feature_view.materialization_intervals.append((start_date, end_date))
313+
self.apply([feature_view])
261314

262315
def get_online_features(
263316
self, feature_refs: List[str], entity_rows: List[Dict[str, Any]],

sdk/python/feast/feature_view.py

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@
1111
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
14-
from datetime import timedelta
15-
from typing import Dict, List, Optional, Union
14+
from datetime import datetime, timedelta
15+
from typing import Dict, List, Optional, Tuple, Union
1616

1717
from google.protobuf.duration_pb2 import Duration
1818
from google.protobuf.timestamp_pb2 import Timestamp
@@ -26,6 +26,9 @@
2626
from feast.protos.feast.core.FeatureView_pb2 import (
2727
FeatureViewSpec as FeatureViewSpecProto,
2828
)
29+
from feast.protos.feast.core.FeatureView_pb2 import (
30+
MaterializationInterval as MaterializationIntervalProto,
31+
)
2932
from feast.value_type import ValueType
3033

3134

@@ -44,6 +47,7 @@ class FeatureView:
4447

4548
created_timestamp: Optional[Timestamp] = None
4649
last_updated_timestamp: Optional[Timestamp] = None
50+
materialization_intervals: List[Tuple[datetime, datetime]] = []
4751

4852
def __init__(
4953
self,
@@ -98,7 +102,13 @@ def to_proto(self) -> FeatureViewProto:
98102
meta = FeatureViewMetaProto(
99103
created_timestamp=self.created_timestamp,
100104
last_updated_timestamp=self.last_updated_timestamp,
105+
materialization_intervals=[],
101106
)
107+
for interval in self.materialization_intervals:
108+
interval_proto = MaterializationIntervalProto()
109+
interval_proto.start_time.FromDatetime(interval[0])
110+
interval_proto.end_time.FromDatetime(interval[1])
111+
meta.materialization_intervals.append(interval_proto)
102112

103113
if self.ttl is not None:
104114
ttl_duration = Duration()
@@ -152,4 +162,15 @@ def from_proto(cls, feature_view_proto: FeatureViewProto):
152162

153163
feature_view.created_timestamp = feature_view_proto.meta.created_timestamp
154164

165+
for interval in feature_view_proto.meta.materialization_intervals:
166+
feature_view.materialization_intervals.append(
167+
(interval.start_time.ToDatetime(), interval.end_time.ToDatetime())
168+
)
169+
155170
return feature_view
171+
172+
@property
173+
def most_recent_end_time(self) -> Optional[datetime]:
174+
if len(self.materialization_intervals) == 0:
175+
return None
176+
return max([interval[1] for interval in self.materialization_intervals])

sdk/python/tests/test_materialize_from_bigquery_to_datastore.py

Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,19 @@ def setup_method(self):
2828

2929
def test_bigquery_table_to_datastore_correctness(self):
3030
# create dataset
31-
ts = pd.Timestamp.now(tz="UTC").round("ms")
31+
now = datetime.utcnow()
32+
ts = pd.Timestamp(now).round("ms")
3233
data = {
33-
"id": [1, 2, 1],
34-
"value": [0.1, 0.2, 0.3],
35-
"ts_1": [ts - timedelta(minutes=2), ts, ts],
36-
"created_ts": [ts, ts, ts],
34+
"id": [1, 2, 1, 3, 3],
35+
"value": [0.1, 0.2, 0.3, 4, 5],
36+
"ts_1": [
37+
ts - timedelta(seconds=4),
38+
ts,
39+
ts - timedelta(seconds=3),
40+
ts - timedelta(seconds=4),
41+
ts - timedelta(seconds=1),
42+
],
43+
"created_ts": [ts, ts, ts, ts, ts],
3744
}
3845
df = pd.DataFrame.from_dict(data)
3946

@@ -67,9 +74,7 @@ def test_bigquery_table_to_datastore_correctness(self):
6774

6875
# run materialize()
6976
fs.materialize(
70-
[fv.name],
71-
datetime.utcnow() - timedelta(minutes=5),
72-
datetime.utcnow() - timedelta(minutes=0),
77+
[fv.name], now - timedelta(seconds=5), now - timedelta(seconds=2),
7378
)
7479

7580
# check result of materialize()
@@ -78,6 +83,23 @@ def test_bigquery_table_to_datastore_correctness(self):
7883
).to_dict()
7984
assert abs(response_dict[f"{fv.name}:value"][0] - 0.3) < 1e-6
8085

86+
# check prior value for materialize_incremental()
87+
response_dict = fs.get_online_features(
88+
[f"{fv.name}:value"], [{"driver_id": 3}]
89+
).to_dict()
90+
assert abs(response_dict[f"{fv.name}:value"][0] - 4) < 1e-6
91+
92+
# run materialize_incremental()
93+
fs.materialize_incremental(
94+
[fv.name], now - timedelta(seconds=0),
95+
)
96+
97+
# check result of materialize_incremental()
98+
response_dict = fs.get_online_features(
99+
[f"{fv.name}:value"], [{"driver_id": 3}]
100+
).to_dict()
101+
assert abs(response_dict[f"{fv.name}:value"][0] - 5) < 1e-6
102+
81103
def test_bigquery_query_to_datastore_correctness(self):
82104
# create dataset
83105
ts = pd.Timestamp.now(tz="UTC").round("ms")

0 commit comments

Comments
 (0)