@@ -1550,6 +1550,54 @@ def get_online_features(
15501550 native_entity_values = True ,
15511551 )
15521552
1553+ @log_exceptions_and_usage
1554+ async def get_online_features_async (
1555+ self ,
1556+ features : Union [List [str ], FeatureService ],
1557+ entity_rows : List [Dict [str , Any ]],
1558+ full_feature_names : bool = False ,
1559+ ) -> OnlineResponse :
1560+ """
1561+ [Alpha] Retrieves the latest online feature data asynchronously.
1562+
1563+ Note: This method will download the full feature registry the first time it is run. If you are using a
1564+ remote registry like GCS or S3 then that may take a few seconds. The registry remains cached up to a TTL
1565+ duration (which can be set to infinity). If the cached registry is stale (more time than the TTL has
1566+ passed), then a new registry will be downloaded synchronously by this method. This download may
1567+ introduce latency to online feature retrieval. In order to avoid synchronous downloads, please call
1568+ refresh_registry() prior to the TTL being reached. Remember it is possible to set the cache TTL to
1569+ infinity (cache forever).
1570+
1571+ Args:
1572+ features: The list of features that should be retrieved from the online store. These features can be
1573+ specified either as a list of string feature references or as a feature service. String feature
1574+ references must have format "feature_view:feature", e.g. "customer_fv:daily_transactions".
1575+ entity_rows: A list of dictionaries where each key-value is an entity-name, entity-value pair.
1576+ full_feature_names: If True, feature names will be prefixed with the corresponding feature view name,
1577+ changing them from the format "feature" to "feature_view__feature" (e.g. "daily_transactions"
1578+ changes to "customer_fv__daily_transactions").
1579+
1580+ Returns:
1581+ OnlineResponse containing the feature data in records.
1582+
1583+ Raises:
1584+ Exception: No entity with the specified name exists.
1585+ """
1586+ columnar : Dict [str , List [Any ]] = {k : [] for k in entity_rows [0 ].keys ()}
1587+ for entity_row in entity_rows :
1588+ for key , value in entity_row .items ():
1589+ try :
1590+ columnar [key ].append (value )
1591+ except KeyError as e :
1592+ raise ValueError ("All entity_rows must have the same keys." ) from e
1593+
1594+ return await self ._get_online_features_async (
1595+ features = features ,
1596+ entity_values = columnar ,
1597+ full_feature_names = full_feature_names ,
1598+ native_entity_values = True ,
1599+ )
1600+
15531601 def _get_online_request_context (
15541602 self , features : Union [List [str ], FeatureService ], full_feature_names : bool
15551603 ):
@@ -1609,7 +1657,7 @@ def _get_online_request_context(
16091657 entityless_case ,
16101658 )
16111659
1612- def _get_online_features (
1660+ def _prepare_entities_to_read_from_online_store (
16131661 self ,
16141662 features : Union [List [str ], FeatureService ],
16151663 entity_values : Mapping [
@@ -1619,7 +1667,7 @@ def _get_online_features(
16191667 native_entity_values : bool = True ,
16201668 ):
16211669 (
1622- _feature_refs ,
1670+ feature_refs ,
16231671 requested_on_demand_feature_views ,
16241672 entity_name_to_join_key_map ,
16251673 entity_type_map ,
@@ -1694,6 +1742,40 @@ def _get_online_features(
16941742 [DUMMY_ENTITY_VAL ] * num_rows , DUMMY_ENTITY .value_type
16951743 )
16961744
1745+ return (
1746+ join_key_values ,
1747+ grouped_refs ,
1748+ entity_name_to_join_key_map ,
1749+ requested_on_demand_feature_views ,
1750+ feature_refs ,
1751+ requested_result_row_names ,
1752+ online_features_response ,
1753+ )
1754+
1755+ def _get_online_features (
1756+ self ,
1757+ features : Union [List [str ], FeatureService ],
1758+ entity_values : Mapping [
1759+ str , Union [Sequence [Any ], Sequence [Value ], RepeatedValue ]
1760+ ],
1761+ full_feature_names : bool = False ,
1762+ native_entity_values : bool = True ,
1763+ ):
1764+ (
1765+ join_key_values ,
1766+ grouped_refs ,
1767+ entity_name_to_join_key_map ,
1768+ requested_on_demand_feature_views ,
1769+ feature_refs ,
1770+ requested_result_row_names ,
1771+ online_features_response ,
1772+ ) = self ._prepare_entities_to_read_from_online_store (
1773+ features = features ,
1774+ entity_values = entity_values ,
1775+ full_feature_names = full_feature_names ,
1776+ native_entity_values = native_entity_values ,
1777+ )
1778+
16971779 provider = self ._get_provider ()
16981780 for table , requested_features in grouped_refs :
16991781 # Get the correct set of entity values with the correct join keys.
@@ -1724,7 +1806,71 @@ def _get_online_features(
17241806 if requested_on_demand_feature_views :
17251807 self ._augment_response_with_on_demand_transforms (
17261808 online_features_response ,
1727- _feature_refs ,
1809+ feature_refs ,
1810+ requested_on_demand_feature_views ,
1811+ full_feature_names ,
1812+ )
1813+
1814+ self ._drop_unneeded_columns (
1815+ online_features_response , requested_result_row_names
1816+ )
1817+ return OnlineResponse (online_features_response )
1818+
1819+ async def _get_online_features_async (
1820+ self ,
1821+ features : Union [List [str ], FeatureService ],
1822+ entity_values : Mapping [
1823+ str , Union [Sequence [Any ], Sequence [Value ], RepeatedValue ]
1824+ ],
1825+ full_feature_names : bool = False ,
1826+ native_entity_values : bool = True ,
1827+ ):
1828+ (
1829+ join_key_values ,
1830+ grouped_refs ,
1831+ entity_name_to_join_key_map ,
1832+ requested_on_demand_feature_views ,
1833+ feature_refs ,
1834+ requested_result_row_names ,
1835+ online_features_response ,
1836+ ) = self ._prepare_entities_to_read_from_online_store (
1837+ features = features ,
1838+ entity_values = entity_values ,
1839+ full_feature_names = full_feature_names ,
1840+ native_entity_values = native_entity_values ,
1841+ )
1842+
1843+ provider = self ._get_provider ()
1844+ for table , requested_features in grouped_refs :
1845+ # Get the correct set of entity values with the correct join keys.
1846+ table_entity_values , idxs = self ._get_unique_entities (
1847+ table ,
1848+ join_key_values ,
1849+ entity_name_to_join_key_map ,
1850+ )
1851+
1852+ # Fetch feature data for the minimum set of Entities.
1853+ feature_data = await self ._read_from_online_store_async (
1854+ table_entity_values ,
1855+ provider ,
1856+ requested_features ,
1857+ table ,
1858+ )
1859+
1860+ # Populate the result_rows with the Features from the OnlineStore inplace.
1861+ self ._populate_response_from_feature_data (
1862+ feature_data ,
1863+ idxs ,
1864+ online_features_response ,
1865+ full_feature_names ,
1866+ requested_features ,
1867+ table ,
1868+ )
1869+
1870+ if requested_on_demand_feature_views :
1871+ self ._augment_response_with_on_demand_transforms (
1872+ online_features_response ,
1873+ feature_refs ,
17281874 requested_on_demand_feature_views ,
17291875 full_feature_names ,
17301876 )
@@ -1965,38 +2111,24 @@ def _get_unique_entities(
19652111 )
19662112 return unique_entities , indexes
19672113
1968- def _read_from_online_store (
2114+ def _get_entity_key_protos (
19692115 self ,
19702116 entity_rows : Iterable [Mapping [str , Value ]],
1971- provider : Provider ,
1972- requested_features : List [str ],
1973- table : FeatureView ,
1974- ) -> List [Tuple [List [Timestamp ], List ["FieldStatus.ValueType" ], List [Value ]]]:
1975- """Read and process data from the OnlineStore for a given FeatureView.
1976-
1977- This method guarantees that the order of the data in each element of the
1978- List returned is the same as the order of `requested_features`.
1979-
1980- This method assumes that `provider.online_read` returns data for each
1981- combination of Entities in `entity_rows` in the same order as they
1982- are provided.
1983- """
2117+ ) -> List [EntityKeyProto ]:
19842118 # Instantiate one EntityKeyProto per Entity.
19852119 entity_key_protos = [
19862120 EntityKeyProto (join_keys = row .keys (), entity_values = row .values ())
19872121 for row in entity_rows
19882122 ]
2123+ return entity_key_protos
19892124
1990- # Fetch data for Entities.
1991- read_rows = provider .online_read (
1992- config = self .config ,
1993- table = table ,
1994- entity_keys = entity_key_protos ,
1995- requested_features = requested_features ,
1996- )
1997-
1998- # Each row is a set of features for a given entity key. We only need to convert
1999- # the data to Protobuf once.
2125+ def _convert_rows_to_protobuf (
2126+ self ,
2127+ requested_features : List [str ],
2128+ read_rows : List [Tuple [Optional [datetime ], Optional [Dict [str , Value ]]]],
2129+ ) -> List [Tuple [List [Timestamp ], List ["FieldStatus.ValueType" ], List [Value ]]]:
2130+ # Each row is a set of features for a given entity key.
2131+ # We only need to convert the data to Protobuf once.
20002132 null_value = Value ()
20012133 read_row_protos = []
20022134 for read_row in read_rows :
@@ -2023,6 +2155,53 @@ def _read_from_online_store(
20232155 read_row_protos .append ((event_timestamps , statuses , values ))
20242156 return read_row_protos
20252157
2158+ def _read_from_online_store (
2159+ self ,
2160+ entity_rows : Iterable [Mapping [str , Value ]],
2161+ provider : Provider ,
2162+ requested_features : List [str ],
2163+ table : FeatureView ,
2164+ ) -> List [Tuple [List [Timestamp ], List ["FieldStatus.ValueType" ], List [Value ]]]:
2165+ """Read and process data from the OnlineStore for a given FeatureView.
2166+
2167+ This method guarantees that the order of the data in each element of the
2168+ List returned is the same as the order of `requested_features`.
2169+
2170+ This method assumes that `provider.online_read` returns data for each
2171+ combination of Entities in `entity_rows` in the same order as they
2172+ are provided.
2173+ """
2174+ entity_key_protos = self ._get_entity_key_protos (entity_rows )
2175+
2176+ # Fetch data for Entities.
2177+ read_rows = provider .online_read (
2178+ config = self .config ,
2179+ table = table ,
2180+ entity_keys = entity_key_protos ,
2181+ requested_features = requested_features ,
2182+ )
2183+
2184+ return self ._convert_rows_to_protobuf (requested_features , read_rows )
2185+
2186+ async def _read_from_online_store_async (
2187+ self ,
2188+ entity_rows : Iterable [Mapping [str , Value ]],
2189+ provider : Provider ,
2190+ requested_features : List [str ],
2191+ table : FeatureView ,
2192+ ) -> List [Tuple [List [Timestamp ], List ["FieldStatus.ValueType" ], List [Value ]]]:
2193+ entity_key_protos = self ._get_entity_key_protos (entity_rows )
2194+
2195+ # Fetch data for Entities.
2196+ read_rows = await provider .online_read_async (
2197+ config = self .config ,
2198+ table = table ,
2199+ entity_keys = entity_key_protos ,
2200+ requested_features = requested_features ,
2201+ )
2202+
2203+ return self ._convert_rows_to_protobuf (requested_features , read_rows )
2204+
20262205 def _retrieve_from_online_store (
20272206 self ,
20282207 provider : Provider ,
0 commit comments