@@ -1559,75 +1559,16 @@ def get_online_features(
15591559 ... )
15601560 >>> online_response_dict = online_response.to_dict()
15611561 """
1562- if isinstance (entity_rows , list ):
1563- columnar : Dict [str , List [Any ]] = {k : [] for k in entity_rows [0 ].keys ()}
1564- for entity_row in entity_rows :
1565- for key , value in entity_row .items ():
1566- try :
1567- columnar [key ].append (value )
1568- except KeyError as e :
1569- raise ValueError (
1570- "All entity_rows must have the same keys."
1571- ) from e
1572-
1573- entity_rows = columnar
1562+ provider = self ._get_provider ()
15741563
1575- (
1576- join_key_values ,
1577- grouped_refs ,
1578- entity_name_to_join_key_map ,
1579- requested_on_demand_feature_views ,
1580- feature_refs ,
1581- requested_result_row_names ,
1582- online_features_response ,
1583- ) = utils ._prepare_entities_to_read_from_online_store (
1564+ return provider .get_online_features (
1565+ config = self .config ,
1566+ features = features ,
1567+ entity_rows = entity_rows ,
15841568 registry = self ._registry ,
15851569 project = self .project ,
1586- features = features ,
1587- entity_values = entity_rows ,
15881570 full_feature_names = full_feature_names ,
1589- native_entity_values = True ,
1590- )
1591-
1592- provider = self ._get_provider ()
1593- for table , requested_features in grouped_refs :
1594- # Get the correct set of entity values with the correct join keys.
1595- table_entity_values , idxs = utils ._get_unique_entities (
1596- table ,
1597- join_key_values ,
1598- entity_name_to_join_key_map ,
1599- )
1600-
1601- # Fetch feature data for the minimum set of Entities.
1602- feature_data = self ._read_from_online_store (
1603- table_entity_values ,
1604- provider ,
1605- requested_features ,
1606- table ,
1607- )
1608-
1609- # Populate the result_rows with the Features from the OnlineStore inplace.
1610- utils ._populate_response_from_feature_data (
1611- feature_data ,
1612- idxs ,
1613- online_features_response ,
1614- full_feature_names ,
1615- requested_features ,
1616- table ,
1617- )
1618-
1619- if requested_on_demand_feature_views :
1620- utils ._augment_response_with_on_demand_transforms (
1621- online_features_response ,
1622- feature_refs ,
1623- requested_on_demand_feature_views ,
1624- full_feature_names ,
1625- )
1626-
1627- utils ._drop_unneeded_columns (
1628- online_features_response , requested_result_row_names
16291571 )
1630- return OnlineResponse (online_features_response )
16311572
16321573 async def get_online_features_async (
16331574 self ,
@@ -1664,75 +1605,16 @@ async def get_online_features_async(
16641605 Raises:
16651606 Exception: No entity with the specified name exists.
16661607 """
1667- if isinstance (entity_rows , list ):
1668- columnar : Dict [str , List [Any ]] = {k : [] for k in entity_rows [0 ].keys ()}
1669- for entity_row in entity_rows :
1670- for key , value in entity_row .items ():
1671- try :
1672- columnar [key ].append (value )
1673- except KeyError as e :
1674- raise ValueError (
1675- "All entity_rows must have the same keys."
1676- ) from e
1677-
1678- entity_rows = columnar
1608+ provider = self ._get_provider ()
16791609
1680- (
1681- join_key_values ,
1682- grouped_refs ,
1683- entity_name_to_join_key_map ,
1684- requested_on_demand_feature_views ,
1685- feature_refs ,
1686- requested_result_row_names ,
1687- online_features_response ,
1688- ) = utils ._prepare_entities_to_read_from_online_store (
1610+ return await provider .get_online_features_async (
1611+ config = self .config ,
1612+ features = features ,
1613+ entity_rows = entity_rows ,
16891614 registry = self ._registry ,
16901615 project = self .project ,
1691- features = features ,
1692- entity_values = entity_rows ,
16931616 full_feature_names = full_feature_names ,
1694- native_entity_values = True ,
1695- )
1696-
1697- provider = self ._get_provider ()
1698- for table , requested_features in grouped_refs :
1699- # Get the correct set of entity values with the correct join keys.
1700- table_entity_values , idxs = utils ._get_unique_entities (
1701- table ,
1702- join_key_values ,
1703- entity_name_to_join_key_map ,
1704- )
1705-
1706- # Fetch feature data for the minimum set of Entities.
1707- feature_data = await self ._read_from_online_store_async (
1708- table_entity_values ,
1709- provider ,
1710- requested_features ,
1711- table ,
1712- )
1713-
1714- # Populate the result_rows with the Features from the OnlineStore inplace.
1715- utils ._populate_response_from_feature_data (
1716- feature_data ,
1717- idxs ,
1718- online_features_response ,
1719- full_feature_names ,
1720- requested_features ,
1721- table ,
1722- )
1723-
1724- if requested_on_demand_feature_views :
1725- utils ._augment_response_with_on_demand_transforms (
1726- online_features_response ,
1727- feature_refs ,
1728- requested_on_demand_feature_views ,
1729- full_feature_names ,
1730- )
1731-
1732- utils ._drop_unneeded_columns (
1733- online_features_response , requested_result_row_names
17341617 )
1735- return OnlineResponse (online_features_response )
17361618
17371619 def retrieve_online_documents (
17381620 self ,
@@ -1806,53 +1688,6 @@ def retrieve_online_documents(
18061688 )
18071689 return OnlineResponse (online_features_response )
18081690
1809- def _read_from_online_store (
1810- self ,
1811- entity_rows : Iterable [Mapping [str , Value ]],
1812- provider : Provider ,
1813- requested_features : List [str ],
1814- table : FeatureView ,
1815- ) -> List [Tuple [List [Timestamp ], List ["FieldStatus.ValueType" ], List [Value ]]]:
1816- """Read and process data from the OnlineStore for a given FeatureView.
1817-
1818- This method guarantees that the order of the data in each element of the
1819- List returned is the same as the order of `requested_features`.
1820-
1821- This method assumes that `provider.online_read` returns data for each
1822- combination of Entities in `entity_rows` in the same order as they
1823- are provided.
1824- """
1825- entity_key_protos = utils ._get_entity_key_protos (entity_rows )
1826-
1827- # Fetch data for Entities.
1828- read_rows = provider .online_read (
1829- config = self .config ,
1830- table = table ,
1831- entity_keys = entity_key_protos ,
1832- requested_features = requested_features ,
1833- )
1834-
1835- return utils ._convert_rows_to_protobuf (requested_features , read_rows )
1836-
1837- async def _read_from_online_store_async (
1838- self ,
1839- entity_rows : Iterable [Mapping [str , Value ]],
1840- provider : Provider ,
1841- requested_features : List [str ],
1842- table : FeatureView ,
1843- ) -> List [Tuple [List [Timestamp ], List ["FieldStatus.ValueType" ], List [Value ]]]:
1844- entity_key_protos = utils ._get_entity_key_protos (entity_rows )
1845-
1846- # Fetch data for Entities.
1847- read_rows = await provider .online_read_async (
1848- config = self .config ,
1849- table = table ,
1850- entity_keys = entity_key_protos ,
1851- requested_features = requested_features ,
1852- )
1853-
1854- return utils ._convert_rows_to_protobuf (requested_features , read_rows )
1855-
18561691 def _retrieve_from_online_store (
18571692 self ,
18581693 provider : Provider ,
0 commit comments