1111# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1212# See the License for the specific language governing permissions and
1313# limitations under the License.
14+ import asyncio
1415import itertools
1516import os
1617import warnings
3334import pandas as pd
3435import pyarrow as pa
3536from colorama import Fore , Style
37+ from fastapi .concurrency import run_in_threadpool
3638from google .protobuf .timestamp_pb2 import Timestamp
3739from tqdm import tqdm
3840
@@ -1433,26 +1435,13 @@ def tqdm_builder(length):
14331435 end_date ,
14341436 )
14351437
1436- def push (
1437- self ,
1438- push_source_name : str ,
1439- df : pd .DataFrame ,
1440- allow_registry_cache : bool = True ,
1441- to : PushMode = PushMode .ONLINE ,
1442- ):
1443- """
1444- Push features to a push source. This updates all the feature views that have the push source as stream source.
1445-
1446- Args:
1447- push_source_name: The name of the push source we want to push data to.
1448- df: The data being pushed.
1449- allow_registry_cache: Whether to allow cached versions of the registry.
1450- to: Whether to push to online or offline store. Defaults to online store only.
1451- """
1438+ def _fvs_for_push_source_or_raise (
1439+ self , push_source_name : str , allow_cache : bool
1440+ ) -> set [FeatureView ]:
14521441 from feast .data_source import PushSource
14531442
1454- all_fvs = self .list_feature_views (allow_cache = allow_registry_cache )
1455- all_fvs += self .list_stream_feature_views (allow_cache = allow_registry_cache )
1443+ all_fvs = self .list_feature_views (allow_cache = allow_cache )
1444+ all_fvs += self .list_stream_feature_views (allow_cache = allow_cache )
14561445
14571446 fvs_with_push_sources = {
14581447 fv
@@ -1467,7 +1456,27 @@ def push(
14671456 if not fvs_with_push_sources :
14681457 raise PushSourceNotFoundException (push_source_name )
14691458
1470- for fv in fvs_with_push_sources :
1459+ return fvs_with_push_sources
1460+
1461+ def push (
1462+ self ,
1463+ push_source_name : str ,
1464+ df : pd .DataFrame ,
1465+ allow_registry_cache : bool = True ,
1466+ to : PushMode = PushMode .ONLINE ,
1467+ ):
1468+ """
1469+ Push features to a push source. This updates all the feature views that have the push source as stream source.
1470+
1471+ Args:
1472+ push_source_name: The name of the push source we want to push data to.
1473+ df: The data being pushed.
1474+ allow_registry_cache: Whether to allow cached versions of the registry.
1475+ to: Whether to push to online or offline store. Defaults to online store only.
1476+ """
1477+ for fv in self ._fvs_for_push_source_or_raise (
1478+ push_source_name , allow_registry_cache
1479+ ):
14711480 if to == PushMode .ONLINE or to == PushMode .ONLINE_AND_OFFLINE :
14721481 self .write_to_online_store (
14731482 fv .name , df , allow_registry_cache = allow_registry_cache
@@ -1477,22 +1486,42 @@ def push(
14771486 fv .name , df , allow_registry_cache = allow_registry_cache
14781487 )
14791488
1480- def write_to_online_store (
1489+ async def push_async (
1490+ self ,
1491+ push_source_name : str ,
1492+ df : pd .DataFrame ,
1493+ allow_registry_cache : bool = True ,
1494+ to : PushMode = PushMode .ONLINE ,
1495+ ):
1496+ fvs = self ._fvs_for_push_source_or_raise (push_source_name , allow_registry_cache )
1497+
1498+ if to == PushMode .ONLINE or to == PushMode .ONLINE_AND_OFFLINE :
1499+ _ = await asyncio .gather (
1500+ * [
1501+ self .write_to_online_store_async (
1502+ fv .name , df , allow_registry_cache = allow_registry_cache
1503+ )
1504+ for fv in fvs
1505+ ]
1506+ )
1507+
1508+ if to == PushMode .OFFLINE or to == PushMode .ONLINE_AND_OFFLINE :
1509+
1510+ def _offline_write ():
1511+ for fv in fvs :
1512+ self .write_to_offline_store (
1513+ fv .name , df , allow_registry_cache = allow_registry_cache
1514+ )
1515+
1516+ await run_in_threadpool (_offline_write )
1517+
1518+ def _get_feature_view_and_df_for_online_write (
14811519 self ,
14821520 feature_view_name : str ,
14831521 df : Optional [pd .DataFrame ] = None ,
14841522 inputs : Optional [Union [Dict [str , List [Any ]], pd .DataFrame ]] = None ,
14851523 allow_registry_cache : bool = True ,
14861524 ):
1487- """
1488- Persists a dataframe to the online store.
1489-
1490- Args:
1491- feature_view_name: The feature view to which the dataframe corresponds.
1492- df: The dataframe to be persisted.
1493- inputs: Optional the dictionary object to be written
1494- allow_registry_cache (optional): Whether to allow retrieving feature views from a cached registry.
1495- """
14961525 feature_view_dict = {
14971526 fv_proto .name : fv_proto
14981527 for fv_proto in self .list_all_feature_views (allow_registry_cache )
@@ -1519,10 +1548,60 @@ def write_to_online_store(
15191548 df = pd .DataFrame (df )
15201549 except Exception as _ :
15211550 raise DataFrameSerializationError (df )
1551+ return feature_view , df
1552+
1553+ def write_to_online_store (
1554+ self ,
1555+ feature_view_name : str ,
1556+ df : Optional [pd .DataFrame ] = None ,
1557+ inputs : Optional [Union [Dict [str , List [Any ]], pd .DataFrame ]] = None ,
1558+ allow_registry_cache : bool = True ,
1559+ ):
1560+ """
1561+ Persists a dataframe to the online store.
15221562
1563+ Args:
1564+ feature_view_name: The feature view to which the dataframe corresponds.
1565+ df: The dataframe to be persisted.
1566+ inputs: Optional the dictionary object to be written
1567+ allow_registry_cache (optional): Whether to allow retrieving feature views from a cached registry.
1568+ """
1569+
1570+ feature_view , df = self ._get_feature_view_and_df_for_online_write (
1571+ feature_view_name = feature_view_name ,
1572+ df = df ,
1573+ inputs = inputs ,
1574+ allow_registry_cache = allow_registry_cache ,
1575+ )
15231576 provider = self ._get_provider ()
15241577 provider .ingest_df (feature_view , df )
15251578
1579+ async def write_to_online_store_async (
1580+ self ,
1581+ feature_view_name : str ,
1582+ df : Optional [pd .DataFrame ] = None ,
1583+ inputs : Optional [Union [Dict [str , List [Any ]], pd .DataFrame ]] = None ,
1584+ allow_registry_cache : bool = True ,
1585+ ):
1586+ """
1587+ Persists a dataframe to the online store asynchronously.
1588+
1589+ Args:
1590+ feature_view_name: The feature view to which the dataframe corresponds.
1591+ df: The dataframe to be persisted.
1592+ inputs: Optional the dictionary object to be written
1593+ allow_registry_cache (optional): Whether to allow retrieving feature views from a cached registry.
1594+ """
1595+
1596+ feature_view , df = self ._get_feature_view_and_df_for_online_write (
1597+ feature_view_name = feature_view_name ,
1598+ df = df ,
1599+ inputs = inputs ,
1600+ allow_registry_cache = allow_registry_cache ,
1601+ )
1602+ provider = self ._get_provider ()
1603+ await provider .ingest_df_async (feature_view , df )
1604+
15261605 def write_to_offline_store (
15271606 self ,
15281607 feature_view_name : str ,
0 commit comments