1- from typing import Dict , List , Optional
1+ import logging
2+ from typing import Dict , Optional
23
34import pandas as pd
5+ from testcontainers .core .container import DockerContainer
6+ from testcontainers .core .waiting_utils import wait_for_logs
47
58from feast .data_source import DataSource
69from feast .infra .offline_stores .contrib .postgres_offline_store .postgres import (
710 PostgreSQLOfflineStoreConfig ,
811 PostgreSQLSource ,
912)
10- from feast .infra .utils .postgres .connection_utils import _get_conn , df_to_postgres_table
13+ from feast .infra .utils .postgres .connection_utils import df_to_postgres_table
1114from feast .repo_config import FeastConfigBaseModel
1215from tests .integration .feature_repos .universal .data_source_creator import (
1316 DataSourceCreator ,
1417)
18+ from tests .integration .feature_repos .universal .online_store_creator import (
19+ OnlineStoreCreator ,
20+ )
1521
22+ logger = logging .getLogger (__name__ )
1623
17- class PostgreSQLDataSourceCreator (DataSourceCreator ):
18- tables : List [str ] = []
1924
20- def __init__ (self , project_name : str , * args , ** kwargs ):
21- super ().__init__ (project_name )
22- self .project_name = project_name
25+ class PostgresSourceCreatorSingleton :
26+ postgres_user = "test"
27+ postgres_password = "test"
28+ postgres_db = "test"
29+
30+ running = False
31+
32+ project_name = None
33+ container = None
34+ provided_container = None
2335
24- self .offline_store_config = PostgreSQLOfflineStoreConfig (
36+ offline_store_config = None
37+
38+ @classmethod
39+ def initialize (cls , project_name : str , * args , ** kwargs ):
40+ cls .project_name = project_name
41+
42+ if "offline_container" not in kwargs or not kwargs .get (
43+ "offline_container" , None
44+ ):
45+ # If we don't get an offline container provided, we try to create it on the fly.
46+ # the problem here is that each test creates its own container, which basically
47+ # browns out developer laptops.
48+ cls .container = (
49+ DockerContainer ("postgres:latest" )
50+ .with_exposed_ports (5432 )
51+ .with_env ("POSTGRES_USER" , cls .postgres_user )
52+ .with_env ("POSTGRES_PASSWORD" , cls .postgres_password )
53+ .with_env ("POSTGRES_DB" , cls .postgres_db )
54+ )
55+
56+ cls .container .start ()
57+ cls .provided_container = False
58+ log_string_to_wait_for = "database system is ready to accept connections"
59+ waited = wait_for_logs (
60+ container = cls .container ,
61+ predicate = log_string_to_wait_for ,
62+ timeout = 30 ,
63+ interval = 10 ,
64+ )
65+ logger .info ("Waited for %s seconds until postgres container was up" , waited )
66+ cls .running = True
67+ else :
68+ cls .provided_container = True
69+ cls .container = kwargs ["offline_container" ]
70+
71+ cls .offline_store_config = PostgreSQLOfflineStoreConfig (
2572 type = "postgres" ,
2673 host = "localhost" ,
27- port = 5432 ,
28- database = "postgres" ,
74+ port = cls . container . get_exposed_port ( 5432 ) ,
75+ database = cls . container . env [ "POSTGRES_DB" ] ,
2976 db_schema = "public" ,
30- user = "postgres" ,
31- password = "docker" ,
77+ user = cls . container . env [ "POSTGRES_USER" ] ,
78+ password = cls . container . env [ "POSTGRES_PASSWORD" ] ,
3279 )
3380
81+ @classmethod
3482 def create_data_source (
35- self ,
83+ cls ,
3684 df : pd .DataFrame ,
3785 destination_name : str ,
3886 suffix : Optional [str ] = None ,
@@ -41,11 +89,10 @@ def create_data_source(
4189 field_mapping : Dict [str , str ] = None ,
4290 ) -> DataSource :
4391
44- destination_name = self .get_prefixed_table_name (destination_name )
92+ destination_name = cls .get_prefixed_table_name (destination_name )
4593
46- df_to_postgres_table (self .offline_store_config , df , destination_name )
47-
48- self .tables .append (destination_name )
94+ if cls .offline_store_config :
95+ df_to_postgres_table (cls .offline_store_config , df , destination_name )
4996
5097 return PostgreSQLSource (
5198 name = destination_name ,
@@ -55,17 +102,85 @@ def create_data_source(
55102 field_mapping = field_mapping or {"ts_1" : "ts" },
56103 )
57104
105+ @classmethod
106+ def create_offline_store_config (cls ) -> PostgreSQLOfflineStoreConfig :
107+ assert cls .offline_store_config
108+ return cls .offline_store_config
109+
110+ @classmethod
111+ def get_prefixed_table_name (cls , suffix : str ) -> str :
112+ return f"{ cls .project_name } _{ suffix } "
113+
114+ @classmethod
115+ def create_online_store (cls ) -> Dict [str , str ]:
116+ assert cls .container
117+ return {
118+ "type" : "postgres" ,
119+ "host" : "localhost" ,
120+ "port" : cls .container .get_exposed_port (5432 ),
121+ "database" : cls .postgres_db ,
122+ "db_schema" : "feature_store" ,
123+ "user" : cls .postgres_user ,
124+ "password" : cls .postgres_password ,
125+ }
126+
127+ @classmethod
128+ def create_saved_dataset_destination (cls ):
129+ # FIXME: ...
130+ return None
131+
132+ @classmethod
133+ def teardown (cls ):
134+ if not cls .provided_container and cls .running :
135+ cls .container .stop ()
136+ cls .running = False
137+ cls .container = None
138+ cls .project = None
139+
140+
141+ class PostgreSQLDataSourceCreator (DataSourceCreator , OnlineStoreCreator ):
142+
143+ postgres_user = "test"
144+ postgres_password = "test"
145+ postgres_db = "test"
146+
147+ running = False
148+
149+ def __init__ (self , project_name : str , * args , ** kwargs ):
150+ super ().__init__ (project_name )
151+ PostgresSourceCreatorSingleton .initialize (project_name , args , kwargs )
152+
153+ def create_data_source (
154+ self ,
155+ df : pd .DataFrame ,
156+ destination_name : str ,
157+ suffix : Optional [str ] = None ,
158+ timestamp_field = "ts" ,
159+ created_timestamp_column = "created_ts" ,
160+ field_mapping : Dict [str , str ] = None ,
161+ ) -> DataSource :
162+
163+ return PostgresSourceCreatorSingleton .create_data_source (
164+ df ,
165+ destination_name ,
166+ suffix ,
167+ timestamp_field ,
168+ created_timestamp_column ,
169+ field_mapping ,
170+ )
171+
58172 def create_offline_store_config (self ) -> FeastConfigBaseModel :
59- return self . offline_store_config
173+ return PostgresSourceCreatorSingleton . create_offline_store_config ()
60174
61175 def get_prefixed_table_name (self , suffix : str ) -> str :
62- return f"{ self .project_name } _{ suffix } "
176+ return PostgresSourceCreatorSingleton .get_prefixed_table_name (suffix )
177+
178+ def create_online_store (self ) -> Dict [str , str ]:
179+ return PostgresSourceCreatorSingleton .create_online_store ()
63180
64181 def create_saved_dataset_destination (self ):
65182 # FIXME: ...
66183 return None
67184
68185 def teardown (self ):
69- with _get_conn (self .offline_store_config ) as conn , conn .cursor () as cur :
70- for table in self .tables :
71- cur .execute ("DROP TABLE IF EXISTS " + table )
186+ PostgresSourceCreatorSingleton .teardown ()
0 commit comments