1414from feast import Entity
1515from feast .feature_view import FeatureView
1616from feast .infra .key_encoding_utils import serialize_entity_key
17- from feast .infra .online_stores .document_store import (
18- DocumentStore ,
19- DocumentStoreIndexConfig ,
20- )
2117from feast .infra .online_stores .online_store import OnlineStore
2218from feast .infra .utils .postgres .connection_utils import _get_conn , _get_connection_pool
2319from feast .infra .utils .postgres .postgres_config import ConnectionType , PostgreSQLConfig
2723from feast .usage import log_exceptions_and_usage
2824
2925
26+ # Search query template to find the top k items that are closest to the given embedding
27+ # SELECT * FROM items ORDER BY embedding <-> '[3,1,2]' LIMIT 5;
28+ SEARCH_QUERY_TEMPLATE = """
29+ SELECT feature_name, value, event_ts FROM {table_name}
30+ WHERE feature_name = '{feature_name}'
31+ ORDER BY value <-> %s
32+ LIMIT %s;
33+ """
34+
35+
36+ # Create index query template to create a index based on the index type
37+ CREATE_INDEX_QUERY_TEMPLATE = """
38+ CREATE INDEX ON {table_name} USING {index_type} (embedding {embeding_type});
39+ """
40+
41+
3042class PostgreSQLOnlineStoreConfig (PostgreSQLConfig ):
3143 type : Literal ["postgres" ] = "postgres"
3244
@@ -256,6 +268,48 @@ def teardown(
256268 logging .exception ("Teardown failed" )
257269 raise
258270
271+ def retrieve_online_documents (
272+ self ,
273+ config : RepoConfig ,
274+ table : FeatureView ,
275+ requested_feature : str ,
276+ embedding : List [float ],
277+ top_k : int ,
278+ ) -> List [Tuple [Optional [datetime ], Optional [Dict [str , ValueProto ]]]]:
279+ """
280+
281+ Args:
282+ config: Feast configuration object
283+ table: FeatureView object as the table to search
284+ requested_feature: The requested feature as the column to search
285+ embedding: The query embedding to search for
286+ top_k: The number of items to return
287+ Returns:
288+ List of tuples containing the event timestamp and the document feature
289+
290+ """
291+
292+ # Convert the embedding to a string to be used in postgres vector search
293+ query_embedding_str = f"'[{ ',' .join (str (el ) for el in embedding )} ]'"
294+
295+ result : List [Tuple [Optional [datetime ], Optional [Dict [str , ValueProto ]]]] = []
296+ with self ._get_conn (config ) as conn , conn .cursor () as cur :
297+ cur .execute (
298+ SEARCH_QUERY_TEMPLATE .format (
299+ table_name = table , feature_name = requested_feature
300+ ),
301+ (query_embedding_str , top_k ),
302+ )
303+ rows = cur .fetchall ()
304+
305+ for feature_name , value , event_ts in rows :
306+ val = ValueProto ()
307+ val .ParseFromString (value )
308+
309+ res = {feature_name : val }
310+ result .append ((event_ts , res ))
311+
312+ return result
259313
260314def _table_id (project : str , table : FeatureView ) -> str :
261315 return f"{ project } _{ table .name } "
@@ -278,75 +332,3 @@ def _to_naive_utc(ts: datetime):
278332 return ts
279333 else :
280334 return ts .astimezone (pytz .utc ).replace (tzinfo = None )
281-
282-
283- # Search query template to find the top k items that are closest to the given embedding
284- # SELECT * FROM items ORDER BY embedding <-> '[3,1,2]' LIMIT 5;
285- SEARCH_QUERY_TEMPLATE = """
286- SELECT entity_key, feature_name, value, event_ts FROM {table_name}
287- WHERE feature_name = '{feature_name}'
288- ORDER BY value <-> %s
289- LIMIT %s;
290- """
291-
292- # Create index query template to create a index based on the index type
293- CREATE_INDEX_QUERY_TEMPLATE = """
294- CREATE INDEX ON {table_name} USING {index_type} (embedding {embeding_type});
295- """
296-
297-
298- class PostgresDocumentStoreConfig (DocumentStoreIndexConfig ):
299- type : Literal ["postgres" ] = "postgres"
300-
301-
302- class PostgresDocumentStore (PostgreSQLOnlineStore , DocumentStore ):
303- def online_search (
304- self ,
305- config : RepoConfig ,
306- table : FeatureView ,
307- requested_feature : str ,
308- embedding : np .ndarray ,
309- top_k : int ,
310- ):
311- result : List [Tuple [Optional [datetime ], Optional [Dict [str , ValueProto ]]]] = []
312-
313- with self ._get_conn (config ) as conn , conn .cursor () as cur :
314- cur .execute (
315- SEARCH_QUERY_TEMPLATE .format (
316- table_name = table , feature_name = requested_feature
317- ),
318- (embedding , top_k ),
319- )
320- rows = cur .fetchall ()
321-
322- for row in rows :
323- # The first column is the entity key
324- entity_key = EntityKeyProto ()
325- entity_key .ParseFromString (row [0 ])
326-
327- # The second column is the feature name
328- feature_name = row [1 ]
329-
330- # The third column is the embedding value
331- val = ValueProto ()
332- val .ParseFromString (row [2 ])
333-
334- # The fourth column is the event timestamp
335- event_ts = row [3 ]
336-
337- res = {}
338- res [feature_name ] = val
339- result .append ((event_ts , res ))
340-
341- return result
342-
343- def create_index (self , config : RepoConfig , table : str ):
344- document_store_config = config .document_store_config
345- with self ._get_conn (config ) as conn , conn .cursor () as cur :
346- cur .execute (
347- CREATE_INDEX_QUERY_TEMPLATE .format (
348- table = table ,
349- index_type = document_store_config .index_type ,
350- embeding_type = document_store_config .embedding_type ,
351- )
352- )
0 commit comments