Skip to content

Commit 66bb72f

Browse files
robhowleyemgeee
authored andcommitted
perf: Make /push async (feast-dev#4650)
1 parent 27197bd commit 66bb72f

7 files changed

Lines changed: 311 additions & 40 deletions

File tree

sdk/python/feast/feature_server.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ async def get_online_features(body=Depends(get_body)):
165165
)
166166

167167
@app.post("/push", dependencies=[Depends(inject_user_details)])
168-
def push(body=Depends(get_body)):
168+
async def push(body=Depends(get_body)):
169169
request = PushFeaturesRequest(**json.loads(body))
170170
df = pd.DataFrame(request.df)
171171
actions = []
@@ -201,13 +201,22 @@ def push(body=Depends(get_body)):
201201
for feature_view in fvs_with_push_sources:
202202
assert_permissions(resource=feature_view, actions=actions)
203203

204-
store.push(
204+
push_params = dict(
205205
push_source_name=request.push_source_name,
206206
df=df,
207207
allow_registry_cache=request.allow_registry_cache,
208208
to=to,
209209
)
210210

211+
should_push_async = (
212+
store._get_provider().async_supported.online.write
213+
and to in [PushMode.ONLINE, PushMode.ONLINE_AND_OFFLINE]
214+
)
215+
if should_push_async:
216+
await store.push_async(**push_params)
217+
else:
218+
store.push(**push_params)
219+
211220
@app.post("/write-to-online-store", dependencies=[Depends(inject_user_details)])
212221
def write_to_online_store(body=Depends(get_body)):
213222
request = WriteToFeatureStoreRequest(**json.loads(body))

sdk/python/feast/feature_store.py

Lines changed: 108 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
14+
import asyncio
1415
import itertools
1516
import os
1617
import warnings
@@ -33,6 +34,7 @@
3334
import pandas as pd
3435
import pyarrow as pa
3536
from colorama import Fore, Style
37+
from fastapi.concurrency import run_in_threadpool
3638
from google.protobuf.timestamp_pb2 import Timestamp
3739
from tqdm import tqdm
3840

@@ -1433,26 +1435,13 @@ def tqdm_builder(length):
14331435
end_date,
14341436
)
14351437

1436-
def push(
1437-
self,
1438-
push_source_name: str,
1439-
df: pd.DataFrame,
1440-
allow_registry_cache: bool = True,
1441-
to: PushMode = PushMode.ONLINE,
1442-
):
1443-
"""
1444-
Push features to a push source. This updates all the feature views that have the push source as stream source.
1445-
1446-
Args:
1447-
push_source_name: The name of the push source we want to push data to.
1448-
df: The data being pushed.
1449-
allow_registry_cache: Whether to allow cached versions of the registry.
1450-
to: Whether to push to online or offline store. Defaults to online store only.
1451-
"""
1438+
def _fvs_for_push_source_or_raise(
1439+
self, push_source_name: str, allow_cache: bool
1440+
) -> set[FeatureView]:
14521441
from feast.data_source import PushSource
14531442

1454-
all_fvs = self.list_feature_views(allow_cache=allow_registry_cache)
1455-
all_fvs += self.list_stream_feature_views(allow_cache=allow_registry_cache)
1443+
all_fvs = self.list_feature_views(allow_cache=allow_cache)
1444+
all_fvs += self.list_stream_feature_views(allow_cache=allow_cache)
14561445

14571446
fvs_with_push_sources = {
14581447
fv
@@ -1467,7 +1456,27 @@ def push(
14671456
if not fvs_with_push_sources:
14681457
raise PushSourceNotFoundException(push_source_name)
14691458

1470-
for fv in fvs_with_push_sources:
1459+
return fvs_with_push_sources
1460+
1461+
def push(
1462+
self,
1463+
push_source_name: str,
1464+
df: pd.DataFrame,
1465+
allow_registry_cache: bool = True,
1466+
to: PushMode = PushMode.ONLINE,
1467+
):
1468+
"""
1469+
Push features to a push source. This updates all the feature views that have the push source as stream source.
1470+
1471+
Args:
1472+
push_source_name: The name of the push source we want to push data to.
1473+
df: The data being pushed.
1474+
allow_registry_cache: Whether to allow cached versions of the registry.
1475+
to: Whether to push to online or offline store. Defaults to online store only.
1476+
"""
1477+
for fv in self._fvs_for_push_source_or_raise(
1478+
push_source_name, allow_registry_cache
1479+
):
14711480
if to == PushMode.ONLINE or to == PushMode.ONLINE_AND_OFFLINE:
14721481
self.write_to_online_store(
14731482
fv.name, df, allow_registry_cache=allow_registry_cache
@@ -1477,22 +1486,42 @@ def push(
14771486
fv.name, df, allow_registry_cache=allow_registry_cache
14781487
)
14791488

1480-
def write_to_online_store(
1489+
async def push_async(
1490+
self,
1491+
push_source_name: str,
1492+
df: pd.DataFrame,
1493+
allow_registry_cache: bool = True,
1494+
to: PushMode = PushMode.ONLINE,
1495+
):
1496+
fvs = self._fvs_for_push_source_or_raise(push_source_name, allow_registry_cache)
1497+
1498+
if to == PushMode.ONLINE or to == PushMode.ONLINE_AND_OFFLINE:
1499+
_ = await asyncio.gather(
1500+
*[
1501+
self.write_to_online_store_async(
1502+
fv.name, df, allow_registry_cache=allow_registry_cache
1503+
)
1504+
for fv in fvs
1505+
]
1506+
)
1507+
1508+
if to == PushMode.OFFLINE or to == PushMode.ONLINE_AND_OFFLINE:
1509+
1510+
def _offline_write():
1511+
for fv in fvs:
1512+
self.write_to_offline_store(
1513+
fv.name, df, allow_registry_cache=allow_registry_cache
1514+
)
1515+
1516+
await run_in_threadpool(_offline_write)
1517+
1518+
def _get_feature_view_and_df_for_online_write(
14811519
self,
14821520
feature_view_name: str,
14831521
df: Optional[pd.DataFrame] = None,
14841522
inputs: Optional[Union[Dict[str, List[Any]], pd.DataFrame]] = None,
14851523
allow_registry_cache: bool = True,
14861524
):
1487-
"""
1488-
Persists a dataframe to the online store.
1489-
1490-
Args:
1491-
feature_view_name: The feature view to which the dataframe corresponds.
1492-
df: The dataframe to be persisted.
1493-
inputs: Optional the dictionary object to be written
1494-
allow_registry_cache (optional): Whether to allow retrieving feature views from a cached registry.
1495-
"""
14961525
feature_view_dict = {
14971526
fv_proto.name: fv_proto
14981527
for fv_proto in self.list_all_feature_views(allow_registry_cache)
@@ -1519,10 +1548,60 @@ def write_to_online_store(
15191548
df = pd.DataFrame(df)
15201549
except Exception as _:
15211550
raise DataFrameSerializationError(df)
1551+
return feature_view, df
1552+
1553+
def write_to_online_store(
1554+
self,
1555+
feature_view_name: str,
1556+
df: Optional[pd.DataFrame] = None,
1557+
inputs: Optional[Union[Dict[str, List[Any]], pd.DataFrame]] = None,
1558+
allow_registry_cache: bool = True,
1559+
):
1560+
"""
1561+
Persists a dataframe to the online store.
15221562
1563+
Args:
1564+
feature_view_name: The feature view to which the dataframe corresponds.
1565+
df: The dataframe to be persisted.
1566+
inputs: Optional the dictionary object to be written
1567+
allow_registry_cache (optional): Whether to allow retrieving feature views from a cached registry.
1568+
"""
1569+
1570+
feature_view, df = self._get_feature_view_and_df_for_online_write(
1571+
feature_view_name=feature_view_name,
1572+
df=df,
1573+
inputs=inputs,
1574+
allow_registry_cache=allow_registry_cache,
1575+
)
15231576
provider = self._get_provider()
15241577
provider.ingest_df(feature_view, df)
15251578

1579+
async def write_to_online_store_async(
1580+
self,
1581+
feature_view_name: str,
1582+
df: Optional[pd.DataFrame] = None,
1583+
inputs: Optional[Union[Dict[str, List[Any]], pd.DataFrame]] = None,
1584+
allow_registry_cache: bool = True,
1585+
):
1586+
"""
1587+
Persists a dataframe to the online store asynchronously.
1588+
1589+
Args:
1590+
feature_view_name: The feature view to which the dataframe corresponds.
1591+
df: The dataframe to be persisted.
1592+
inputs: Optional the dictionary object to be written
1593+
allow_registry_cache (optional): Whether to allow retrieving feature views from a cached registry.
1594+
"""
1595+
1596+
feature_view, df = self._get_feature_view_and_df_for_online_write(
1597+
feature_view_name=feature_view_name,
1598+
df=df,
1599+
inputs=inputs,
1600+
allow_registry_cache=allow_registry_cache,
1601+
)
1602+
provider = self._get_provider()
1603+
await provider.ingest_df_async(feature_view, df)
1604+
15261605
def write_to_offline_store(
15271606
self,
15281607
feature_view_name: str,

sdk/python/feast/infra/online_stores/online_store.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,33 @@ def online_write_batch(
6767
"""
6868
pass
6969

70+
async def online_write_batch_async(
71+
self,
72+
config: RepoConfig,
73+
table: FeatureView,
74+
data: List[
75+
Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]
76+
],
77+
progress: Optional[Callable[[int], Any]],
78+
) -> None:
79+
"""
80+
Writes a batch of feature rows to the online store asynchronously.
81+
82+
If a tz-naive timestamp is passed to this method, it is assumed to be UTC.
83+
84+
Args:
85+
config: The config for the current feature store.
86+
table: Feature view to which these feature rows correspond.
87+
data: A list of quadruplets containing feature data. Each quadruplet contains an entity
88+
key, a dict containing feature values, an event timestamp for the row, and the created
89+
timestamp for the row if it exists.
90+
progress: Function to be called once a batch of rows is written to the online store, used
91+
to show progress.
92+
"""
93+
raise NotImplementedError(
94+
f"Online store {self.__class__.__name__} does not support online write batch async"
95+
)
96+
7097
@abstractmethod
7198
def online_read(
7299
self,

sdk/python/feast/infra/passthrough_provider.py

Lines changed: 47 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,20 @@ def online_write_batch(
188188
if self.online_store:
189189
self.online_store.online_write_batch(config, table, data, progress)
190190

191+
async def online_write_batch_async(
192+
self,
193+
config: RepoConfig,
194+
table: Union[FeatureView, BaseFeatureView, OnDemandFeatureView],
195+
data: List[
196+
Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]
197+
],
198+
progress: Optional[Callable[[int], Any]],
199+
) -> None:
200+
if self.online_store:
201+
await self.online_store.online_write_batch_async(
202+
config, table, data, progress
203+
)
204+
191205
def offline_write_batch(
192206
self,
193207
config: RepoConfig,
@@ -291,8 +305,8 @@ def retrieve_online_documents(
291305
)
292306
return result
293307

294-
def ingest_df(
295-
self,
308+
@staticmethod
309+
def _prep_rows_to_write_for_ingestion(
296310
feature_view: Union[BaseFeatureView, FeatureView, OnDemandFeatureView],
297311
df: pd.DataFrame,
298312
field_mapping: Optional[Dict] = None,
@@ -307,10 +321,6 @@ def ingest_df(
307321
for entity in feature_view.entity_columns
308322
}
309323
rows_to_write = _convert_arrow_to_proto(table, feature_view, join_keys)
310-
311-
self.online_write_batch(
312-
self.repo_config, feature_view, rows_to_write, progress=None
313-
)
314324
else:
315325
if hasattr(feature_view, "entity_columns"):
316326
join_keys = {
@@ -336,9 +346,37 @@ def ingest_df(
336346
join_keys[entity.name] = entity.dtype.to_value_type()
337347
rows_to_write = _convert_arrow_to_proto(table, feature_view, join_keys)
338348

339-
self.online_write_batch(
340-
self.repo_config, feature_view, rows_to_write, progress=None
341-
)
349+
return rows_to_write
350+
351+
def ingest_df(
352+
self,
353+
feature_view: Union[BaseFeatureView, FeatureView, OnDemandFeatureView],
354+
df: pd.DataFrame,
355+
field_mapping: Optional[Dict] = None,
356+
):
357+
rows_to_write = self._prep_rows_to_write_for_ingestion(
358+
feature_view=feature_view,
359+
df=df,
360+
field_mapping=field_mapping,
361+
)
362+
self.online_write_batch(
363+
self.repo_config, feature_view, rows_to_write, progress=None
364+
)
365+
366+
async def ingest_df_async(
367+
self,
368+
feature_view: Union[BaseFeatureView, FeatureView, OnDemandFeatureView],
369+
df: pd.DataFrame,
370+
field_mapping: Optional[Dict] = None,
371+
):
372+
rows_to_write = self._prep_rows_to_write_for_ingestion(
373+
feature_view=feature_view,
374+
df=df,
375+
field_mapping=field_mapping,
376+
)
377+
await self.online_write_batch_async(
378+
self.repo_config, feature_view, rows_to_write, progress=None
379+
)
342380

343381
def ingest_df_to_offline_store(self, feature_view: FeatureView, table: pa.Table):
344382
if feature_view.batch_source.field_mapping is not None:

sdk/python/feast/infra/provider.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,32 @@ def online_write_batch(
141141
"""
142142
pass
143143

144+
@abstractmethod
145+
async def online_write_batch_async(
146+
self,
147+
config: RepoConfig,
148+
table: FeatureView,
149+
data: List[
150+
Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]
151+
],
152+
progress: Optional[Callable[[int], Any]],
153+
) -> None:
154+
"""
155+
Writes a batch of feature rows to the online store asynchronously.
156+
157+
If a tz-naive timestamp is passed to this method, it is assumed to be UTC.
158+
159+
Args:
160+
config: The config for the current feature store.
161+
table: Feature view to which these feature rows correspond.
162+
data: A list of quadruplets containing feature data. Each quadruplet contains an entity
163+
key, a dict containing feature values, an event timestamp for the row, and the created
164+
timestamp for the row if it exists.
165+
progress: Function to be called once a batch of rows is written to the online store, used
166+
to show progress.
167+
"""
168+
pass
169+
144170
def ingest_df(
145171
self,
146172
feature_view: Union[BaseFeatureView, FeatureView, OnDemandFeatureView],
@@ -157,6 +183,22 @@ def ingest_df(
157183
"""
158184
pass
159185

186+
async def ingest_df_async(
187+
self,
188+
feature_view: Union[BaseFeatureView, FeatureView, OnDemandFeatureView],
189+
df: pd.DataFrame,
190+
field_mapping: Optional[Dict] = None,
191+
):
192+
"""
193+
Persists a dataframe to the online store asynchronously.
194+
195+
Args:
196+
feature_view: The feature view to which the dataframe corresponds.
197+
df: The dataframe to be persisted.
198+
field_mapping: A dictionary mapping dataframe column names to feature names.
199+
"""
200+
pass
201+
160202
def ingest_df_to_offline_store(
161203
self,
162204
feature_view: FeatureView,

0 commit comments

Comments
 (0)