diff --git a/.github/workflows/complete.yml b/.github/workflows/complete.yml index 68a31560..37ee1f0f 100644 --- a/.github/workflows/complete.yml +++ b/.github/workflows/complete.yml @@ -53,4 +53,34 @@ jobs: - name: Install dependencies run: make install-python-ci-dependencies - name: Lint python - run: make lint-python \ No newline at end of file + run: make lint-python + + + publish-ingestion-jar: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + - uses: actions/setup-java@v1 + with: + java-version: '11' + - name: Cache local Maven repository + uses: actions/cache@v2 + with: + path: ~/.m2/repository + key: ${{ runner.os }}-maven-${{ hashFiles('**/pom.xml') }} + restore-keys: | + ${{ runner.os }}-maven- + - name: build-ingestion-jar-no-tests + env: + # Try to add retries to prevent connection resets + # https://github.community/t/getting-maven-could-not-transfer-artifact-with-500-error-when-using-github-actions/17570 + # https://github.com/actions/virtual-environments/issues/1499#issuecomment-718396233 + MAVEN_OPTS: -Dmaven.wagon.httpconnectionManager.ttlSeconds=25 -Dmaven.wagon.http.retryHandler.count=3 -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false + MAVEN_EXTRA_OPTS: -X + run: make build-java-no-tests REVISION=develop + - name: Upload ingestion jar + uses: actions/upload-artifact@v2 + with: + name: ingestion-jar + path: spark/ingestion/target/feast-ingestion-spark-develop.jar + retention-days: 1 diff --git a/.prow.yaml b/.prow.yaml index 23aa9d63..55c2baa9 100644 --- a/.prow.yaml +++ b/.prow.yaml @@ -153,3 +153,26 @@ presubmits: - name: service-account secret: secretName: feast-service-account +- name: python-sdk-integration-test + decorate: true + always_run: true + spec: + containers: + - image: gcr.io/kf-feast/feast-ci:latest + command: ["infra/scripts/test-integration.sh"] + resources: + requests: + cpu: "1" + memory: "3072Mi" + env: + - name: GOOGLE_APPLICATION_CREDENTIALS + value: /etc/gcloud/service-account.json + volumeMounts: + - mountPath: /etc/gcloud/service-account.json + name: service-account + readOnly: true + subPath: service-account.json + volumes: + - name: service-account + secret: + secretName: feast-service-account \ No newline at end of file diff --git a/tests/integration/__init__.py b/tests/integration/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py new file mode 100644 index 00000000..a80e6948 --- /dev/null +++ b/tests/integration/conftest.py @@ -0,0 +1,10 @@ +def pytest_addoption(parser): + parser.addoption("--dataproc-cluster-name", action="store") + parser.addoption("--dataproc-region", action="store") + parser.addoption("--dataproc-project", action="store") + parser.addoption("--dataproc-staging-location", action="store") + parser.addoption("--dataproc-executor-instances", action="store", default="2") + parser.addoption("--dataproc-executor-cores", action="store", default="2") + parser.addoption("--dataproc-executor-memory", action="store", default="2g") + parser.addoption("--redis-url", action="store") + parser.addoption("--redis-cluster", action="store_true") diff --git a/tests/integration/fixtures/__init__.py b/tests/integration/fixtures/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/integration/fixtures/job_parameters.py b/tests/integration/fixtures/job_parameters.py new file mode 100644 index 00000000..a3327e36 --- /dev/null +++ b/tests/integration/fixtures/job_parameters.py @@ -0,0 +1,138 @@ +import tempfile +import uuid +from datetime import datetime +from os import path +from urllib.parse import urlparse + +import numpy as np +import pandas as pd +import pytest +from google.cloud import storage +from pytz import utc + +from feast_spark.pyspark.abc import RetrievalJobParameters + + +@pytest.fixture(scope="module") +def customer_entity() -> pd.DataFrame: + return pd.DataFrame( + np.array([[1001, datetime(year=2020, month=9, day=1, tzinfo=utc)]]), + columns=["customer", "event_timestamp"], + ) + + +@pytest.fixture(scope="module") +def customer_feature() -> pd.DataFrame: + return pd.DataFrame( + np.array( + [ + [ + 1001, + 100.0, + datetime(year=2020, month=9, day=1, tzinfo=utc), + datetime(year=2020, month=9, day=1, tzinfo=utc), + ], + ] + ), + columns=[ + "customer", + "total_transactions", + "event_timestamp", + "created_timestamp", + ], + ) + + +def upload_dataframe_to_gcs_as_parquet(df: pd.DataFrame, staging_location: str): + gcs_client = storage.Client() + staging_location_uri = urlparse(staging_location) + staging_bucket = staging_location_uri.netloc + remote_path = staging_location_uri.path.lstrip("/") + gcs_bucket = gcs_client.get_bucket(staging_bucket) + temp_dir = str(uuid.uuid4()) + df_remote_path = path.join(remote_path, temp_dir) + blob = gcs_bucket.blob(df_remote_path) + with tempfile.NamedTemporaryFile() as df_local_path: + df.to_parquet(df_local_path.name) + blob.upload_from_filename(df_local_path.name) + return path.join(staging_location, df_remote_path) + + +def new_retrieval_job_params( + entity_source_uri: str, + feature_source_uri: str, + destination_uri: str, + output_format: str, +) -> RetrievalJobParameters: + entity_source = { + "file": { + "format": {"json_class": "ParquetFormat"}, + "path": entity_source_uri, + "event_timestamp_column": "event_timestamp", + } + } + + feature_tables_sources = [ + { + "file": { + "format": {"json_class": "ParquetFormat"}, + "path": feature_source_uri, + "event_timestamp_column": "event_timestamp", + "created_timestamp_column": "created_timestamp", + } + } + ] + + feature_tables = [ + { + "name": "customer_transactions", + "entities": [{"name": "customer", "type": "int64"}], + "features": [{"name": "total_transactions", "type": "double"}], + } + ] + + destination = {"format": output_format, "path": destination_uri} + + return RetrievalJobParameters( + feature_tables=feature_tables, + feature_tables_sources=feature_tables_sources, + entity_source=entity_source, + destination=destination, + extra_packages=["com.linkedin.sparktfrecord:spark-tfrecord_2.12:0.3.0"], + ) + + +@pytest.fixture(scope="module") +def dataproc_retrieval_job_params( + pytestconfig, customer_entity, customer_feature +) -> RetrievalJobParameters: + staging_location = pytestconfig.getoption("--dataproc-staging-location") + entity_source_uri = upload_dataframe_to_gcs_as_parquet( + customer_entity, staging_location + ) + feature_source_uri = upload_dataframe_to_gcs_as_parquet( + customer_feature, staging_location + ) + destination_uri = path.join(staging_location, str(uuid.uuid4())) + + return new_retrieval_job_params( + entity_source_uri, feature_source_uri, destination_uri, "parquet" + ) + + +@pytest.fixture(scope="module") +def dataproc_retrieval_job_params_with_tfrecord_output( + pytestconfig, customer_entity, customer_feature +) -> RetrievalJobParameters: + staging_location = pytestconfig.getoption("--dataproc-staging-location") + entity_source_uri = upload_dataframe_to_gcs_as_parquet( + customer_entity, staging_location + ) + feature_source_uri = upload_dataframe_to_gcs_as_parquet( + customer_feature, staging_location + ) + destination_uri = path.join(staging_location, str(uuid.uuid4())) + + return new_retrieval_job_params( + entity_source_uri, feature_source_uri, destination_uri, "tfrecord" + ) diff --git a/tests/integration/fixtures/launchers.py b/tests/integration/fixtures/launchers.py new file mode 100644 index 00000000..4055c3e0 --- /dev/null +++ b/tests/integration/fixtures/launchers.py @@ -0,0 +1,23 @@ +import pytest + +from feast_spark.pyspark.launchers.gcloud import DataprocClusterLauncher + + +@pytest.fixture +def dataproc_launcher(pytestconfig) -> DataprocClusterLauncher: + cluster_name = pytestconfig.getoption("--dataproc-cluster-name") + region = pytestconfig.getoption("--dataproc-region") + project_id = pytestconfig.getoption("--dataproc-project") + staging_location = pytestconfig.getoption("--dataproc-staging-location") + executor_instances = pytestconfig.getoption("dataproc_executor_instances") + executor_cores = pytestconfig.getoption("dataproc_executor_cores") + executor_memory = pytestconfig.getoption("dataproc_executor_memory") + return DataprocClusterLauncher( + cluster_name=cluster_name, + staging_location=staging_location, + region=region, + project_id=project_id, + executor_instances=executor_instances, + executor_cores=executor_cores, + executor_memory=executor_memory, + ) diff --git a/tests/integration/test_launchers.py b/tests/integration/test_launchers.py new file mode 100644 index 00000000..19f91ec3 --- /dev/null +++ b/tests/integration/test_launchers.py @@ -0,0 +1,60 @@ +import time + +from feast_spark.pyspark.abc import RetrievalJobParameters, SparkJobStatus, SparkJob +from feast_spark.pyspark.launchers.gcloud import DataprocClusterLauncher + +from .fixtures.job_parameters import customer_entity # noqa: F401 +from .fixtures.job_parameters import customer_feature # noqa: F401 +from .fixtures.job_parameters import dataproc_retrieval_job_params # noqa: F401 +from .fixtures.job_parameters import ( # noqa: F401 + dataproc_retrieval_job_params_with_tfrecord_output +) +from .fixtures.launchers import dataproc_launcher # noqa: F401 + + +def wait_for_job_status( + job: SparkJob, + expected_status: SparkJobStatus, + max_retry: int = 4, + retry_interval: int = 5, +): + for i in range(max_retry): + if job.get_status() == expected_status: + return + time.sleep(retry_interval) + raise ValueError(f"Timeout waiting for job status to become {expected_status.name}") + + +def test_dataproc_job_api( + dataproc_launcher: DataprocClusterLauncher, # noqa: F811 + dataproc_retrieval_job_params: RetrievalJobParameters, # noqa: F811 +): + job = dataproc_launcher.historical_feature_retrieval(dataproc_retrieval_job_params) + job_id = job.get_id() + retrieved_job = dataproc_launcher.get_job_by_id(job_id) + assert retrieved_job.get_log_uri is not None + assert retrieved_job.get_id() == job_id + status = retrieved_job.get_status() + assert status in [ + SparkJobStatus.STARTING, + SparkJobStatus.IN_PROGRESS, + SparkJobStatus.COMPLETED, + ] + active_job_ids = [ + job.get_id() for job in dataproc_launcher.list_jobs(include_terminated=False) + ] + assert job_id in active_job_ids + wait_for_job_status(retrieved_job, SparkJobStatus.IN_PROGRESS) + retrieved_job.cancel() + assert retrieved_job.get_status() == SparkJobStatus.FAILED + + +def test_dataproc_job_tfrecord_output( + dataproc_launcher: DataprocClusterLauncher, # noqa: F811 + dataproc_retrieval_job_params_with_tfrecord_output: RetrievalJobParameters, # noqa: F811 +): + job = dataproc_launcher.historical_feature_retrieval( + dataproc_retrieval_job_params_with_tfrecord_output + ) + job.get_output_file_uri() + assert job.get_status() == SparkJobStatus.COMPLETED