diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 249ea5afce..726ef1a5ba 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -178,6 +178,7 @@ The services with containerized replacements currently implemented are: - Redis - Trino - HBase +- Postgres You can run `make test-python-integration-container` to run tests against the containerized versions of dependencies. diff --git a/Makefile b/Makefile index 0ee6fcb685..dec9354529 100644 --- a/Makefile +++ b/Makefile @@ -79,12 +79,15 @@ test-python-universal-postgres: FULL_REPO_CONFIGS_MODULE=sdk.python.feast.infra.offline_stores.contrib.postgres_repo_configuration \ FEAST_USAGE=False \ IS_TEST=True \ - python -m pytest --integration --universal \ + python -m pytest -x --integration --universal \ -k "not test_historical_retrieval_fails_on_validation and \ not test_historical_retrieval_with_validation and \ not test_historical_features_persisting and \ not test_historical_retrieval_fails_on_validation and \ - not test_universal_cli" \ + not test_universal_cli and \ + not test_go_feature_server and \ + not test_feature_logging and \ + not test_universal_types" \ sdk/python/tests test-python-universal-local: diff --git a/sdk/python/feast/infra/offline_stores/contrib/postgres_repo_configuration.py b/sdk/python/feast/infra/offline_stores/contrib/postgres_repo_configuration.py index 91ab686895..bb7fd0caf5 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/postgres_repo_configuration.py +++ b/sdk/python/feast/infra/offline_stores/contrib/postgres_repo_configuration.py @@ -5,20 +5,10 @@ PostgreSQLDataSourceCreator, ) -POSTGRES_ONLINE_CONFIG = { - "type": "postgres", - "host": "localhost", - "port": "5432", - "database": "postgres", - "db_schema": "feature_store", - "user": "postgres", - "password": "docker", -} - FULL_REPO_CONFIGS = [ IntegrationTestRepoConfig( provider="local", offline_store_creator=PostgreSQLDataSourceCreator, - online_store=POSTGRES_ONLINE_CONFIG, + online_store_creator=PostgreSQLDataSourceCreator, ), ] diff --git a/sdk/python/feast/infra/online_stores/contrib/postgres.py b/sdk/python/feast/infra/online_stores/contrib/postgres.py index 479b601806..14ff806e4b 100644 --- a/sdk/python/feast/infra/online_stores/contrib/postgres.py +++ b/sdk/python/feast/infra/online_stores/contrib/postgres.py @@ -64,7 +64,7 @@ def online_write_batch( created_ts, ) ) - # Controll the batch so that we can update the progress + # Control the batch so that we can update the progress batch_size = 5000 for i in range(0, len(insert_values), batch_size): cur_batch = insert_values[i : i + batch_size] diff --git a/sdk/python/tests/conftest.py b/sdk/python/tests/conftest.py index 2ff97649d6..b57152c82d 100644 --- a/sdk/python/tests/conftest.py +++ b/sdk/python/tests/conftest.py @@ -204,16 +204,68 @@ def teardown(): return TrinoContainerSingleton +class PostgresContainerSingleton: + container = None + is_running = False + + postgres_user = "test" + postgres_password = "test" + postgres_db = "test" + + @classmethod + def get_singleton(cls): + if not cls.is_running: + cls.container = ( + DockerContainer("postgres:latest") + .with_exposed_ports(5432) + .with_env("POSTGRES_USER", cls.postgres_user) + .with_env("POSTGRES_PASSWORD", cls.postgres_password) + .with_env("POSTGRES_DB", cls.postgres_db) + ) + + cls.container.start() + log_string_to_wait_for = "database system is ready to accept connections" + waited = wait_for_logs( + container=cls.container, + predicate=log_string_to_wait_for, + timeout=30, + interval=10, + ) + logger.info("Waited for %s seconds until postgres container was up", waited) + cls.is_running = True + return cls.container + + @classmethod + def teardown(cls): + if cls.container: + cls.container.stop() + + +@pytest.fixture(scope="session") +def postgres_fixture(request): + def teardown(): + PostgresContainerSingleton.teardown() + + request.addfinalizer(teardown) + return PostgresContainerSingleton + + @pytest.fixture( params=FULL_REPO_CONFIGS, scope="session", ids=[str(c) for c in FULL_REPO_CONFIGS] ) -def environment(request, worker_id: str, trino_fixture): +def environment(request, worker_id: str, trino_fixture, postgres_fixture): if "TrinoSourceCreator" in request.param.offline_store_creator.__name__: e = construct_test_environment( request.param, worker_id=worker_id, offline_container=trino_fixture.get_singleton(), ) + elif "PostgresSourceCreator" in request.param.offline_store_creator.__name__: + e = construct_test_environment( + request.param, + worker_id=worker_id, + offline_container=postgres_fixture.get_singleton(), + ) else: e = construct_test_environment(request.param, worker_id=worker_id) proc = Process( diff --git a/sdk/python/tests/integration/feature_repos/universal/data_sources/postgres.py b/sdk/python/tests/integration/feature_repos/universal/data_sources/postgres.py index de5df6496f..134234f93c 100644 --- a/sdk/python/tests/integration/feature_repos/universal/data_sources/postgres.py +++ b/sdk/python/tests/integration/feature_repos/universal/data_sources/postgres.py @@ -1,38 +1,86 @@ -from typing import Dict, List, Optional +import logging +from typing import Dict, Optional import pandas as pd +from testcontainers.core.container import DockerContainer +from testcontainers.core.waiting_utils import wait_for_logs from feast.data_source import DataSource from feast.infra.offline_stores.contrib.postgres_offline_store.postgres import ( PostgreSQLOfflineStoreConfig, PostgreSQLSource, ) -from feast.infra.utils.postgres.connection_utils import _get_conn, df_to_postgres_table +from feast.infra.utils.postgres.connection_utils import df_to_postgres_table from feast.repo_config import FeastConfigBaseModel from tests.integration.feature_repos.universal.data_source_creator import ( DataSourceCreator, ) +from tests.integration.feature_repos.universal.online_store_creator import ( + OnlineStoreCreator, +) +logger = logging.getLogger(__name__) -class PostgreSQLDataSourceCreator(DataSourceCreator): - tables: List[str] = [] - def __init__(self, project_name: str, *args, **kwargs): - super().__init__(project_name) - self.project_name = project_name +class PostgresSourceCreatorSingleton: + postgres_user = "test" + postgres_password = "test" + postgres_db = "test" + + running = False + + project_name = None + container = None + provided_container = None - self.offline_store_config = PostgreSQLOfflineStoreConfig( + offline_store_config = None + + @classmethod + def initialize(cls, project_name: str, *args, **kwargs): + cls.project_name = project_name + + if "offline_container" not in kwargs or not kwargs.get( + "offline_container", None + ): + # If we don't get an offline container provided, we try to create it on the fly. + # the problem here is that each test creates its own container, which basically + # browns out developer laptops. + cls.container = ( + DockerContainer("postgres:latest") + .with_exposed_ports(5432) + .with_env("POSTGRES_USER", cls.postgres_user) + .with_env("POSTGRES_PASSWORD", cls.postgres_password) + .with_env("POSTGRES_DB", cls.postgres_db) + ) + + cls.container.start() + cls.provided_container = False + log_string_to_wait_for = "database system is ready to accept connections" + waited = wait_for_logs( + container=cls.container, + predicate=log_string_to_wait_for, + timeout=30, + interval=10, + ) + logger.info("Waited for %s seconds until postgres container was up", waited) + cls.running = True + else: + cls.provided_container = True + cls.container = kwargs["offline_container"] + + cls.offline_store_config = PostgreSQLOfflineStoreConfig( type="postgres", host="localhost", - port=5432, - database="postgres", + port=cls.container.get_exposed_port(5432), + database=cls.container.env["POSTGRES_DB"], db_schema="public", - user="postgres", - password="docker", + user=cls.container.env["POSTGRES_USER"], + password=cls.container.env["POSTGRES_PASSWORD"], ) + @classmethod def create_data_source( - self, + cls, df: pd.DataFrame, destination_name: str, suffix: Optional[str] = None, @@ -41,11 +89,10 @@ def create_data_source( field_mapping: Dict[str, str] = None, ) -> DataSource: - destination_name = self.get_prefixed_table_name(destination_name) + destination_name = cls.get_prefixed_table_name(destination_name) - df_to_postgres_table(self.offline_store_config, df, destination_name) - - self.tables.append(destination_name) + if cls.offline_store_config: + df_to_postgres_table(cls.offline_store_config, df, destination_name) return PostgreSQLSource( name=destination_name, @@ -55,17 +102,85 @@ def create_data_source( field_mapping=field_mapping or {"ts_1": "ts"}, ) + @classmethod + def create_offline_store_config(cls) -> PostgreSQLOfflineStoreConfig: + assert cls.offline_store_config + return cls.offline_store_config + + @classmethod + def get_prefixed_table_name(cls, suffix: str) -> str: + return f"{cls.project_name}_{suffix}" + + @classmethod + def create_online_store(cls) -> Dict[str, str]: + assert cls.container + return { + "type": "postgres", + "host": "localhost", + "port": cls.container.get_exposed_port(5432), + "database": cls.postgres_db, + "db_schema": "feature_store", + "user": cls.postgres_user, + "password": cls.postgres_password, + } + + @classmethod + def create_saved_dataset_destination(cls): + # FIXME: ... + return None + + @classmethod + def teardown(cls): + if not cls.provided_container and cls.running: + cls.container.stop() + cls.running = False + cls.container = None + cls.project = None + + +class PostgreSQLDataSourceCreator(DataSourceCreator, OnlineStoreCreator): + + postgres_user = "test" + postgres_password = "test" + postgres_db = "test" + + running = False + + def __init__(self, project_name: str, *args, **kwargs): + super().__init__(project_name) + PostgresSourceCreatorSingleton.initialize(project_name, args, kwargs) + + def create_data_source( + self, + df: pd.DataFrame, + destination_name: str, + suffix: Optional[str] = None, + timestamp_field="ts", + created_timestamp_column="created_ts", + field_mapping: Dict[str, str] = None, + ) -> DataSource: + + return PostgresSourceCreatorSingleton.create_data_source( + df, + destination_name, + suffix, + timestamp_field, + created_timestamp_column, + field_mapping, + ) + def create_offline_store_config(self) -> FeastConfigBaseModel: - return self.offline_store_config + return PostgresSourceCreatorSingleton.create_offline_store_config() def get_prefixed_table_name(self, suffix: str) -> str: - return f"{self.project_name}_{suffix}" + return PostgresSourceCreatorSingleton.get_prefixed_table_name(suffix) + + def create_online_store(self) -> Dict[str, str]: + return PostgresSourceCreatorSingleton.create_online_store() def create_saved_dataset_destination(self): # FIXME: ... return None def teardown(self): - with _get_conn(self.offline_store_config) as conn, conn.cursor() as cur: - for table in self.tables: - cur.execute("DROP TABLE IF EXISTS " + table) + PostgresSourceCreatorSingleton.teardown() diff --git a/setup.py b/setup.py index 1ecc3df820..8736482f66 100644 --- a/setup.py +++ b/setup.py @@ -150,7 +150,7 @@ "pytest-mock==1.10.4", "Sphinx!=4.0.0,<4.4.0", "sphinx-rtd-theme", - "testcontainers>=3.5", + "testcontainers[postgresql]>=3.5", "adlfs==0.5.9", "firebase-admin==4.5.2", "pre-commit",