From c78ea979ea95011538fd792191791a2acd4535b1 Mon Sep 17 00:00:00 2001 From: Khor Shu Heng Date: Mon, 9 May 2022 17:14:45 +0800 Subject: [PATCH] clean up e2e tests Signed-off-by: Khor Shu Heng --- infra/scripts/aws-runner.sh | 10 -- infra/scripts/azure-runner.sh | 76 -------- infra/scripts/codebuild-entrypoint.sh | 143 --------------- infra/scripts/codebuild_runner.py | 200 --------------------- infra/scripts/publish-java-sdk.sh | 72 -------- infra/scripts/publish-python-sdk.sh | 47 ----- infra/scripts/run-minikube-test.sh | 38 ---- infra/scripts/setup-e2e-env-aws.sh | 14 -- infra/scripts/setup-e2e-env-gcp.sh | 36 ---- infra/scripts/setup-e2e-local.sh | 26 --- infra/scripts/test-core-ingestion.sh | 25 --- infra/scripts/test-docker-compose.sh | 73 -------- infra/scripts/test-end-to-end-local.sh | 69 -------- infra/scripts/test-end-to-end-sparkop.sh | 2 - infra/scripts/test-golang-sdk.sh | 17 -- infra/scripts/test-integration.sh | 8 - infra/scripts/test-java-sdk.sh | 13 -- infra/scripts/test-load.sh | 116 ------------- infra/scripts/test-python-sdk.sh | 14 -- infra/scripts/test-serving.sh | 14 -- infra/scripts/test_job.yaml | 35 ---- tests/e2e/conftest.py | 59 +------ tests/e2e/fixtures/base.py | 20 --- tests/e2e/fixtures/client.py | 197 ++------------------- tests/e2e/fixtures/external_services.py | 20 --- tests/e2e/fixtures/feast_services.py | 212 ----------------------- tests/e2e/fixtures/services.py | 74 -------- tests/e2e/fixtures/statsd_stub.py | 115 ------------ tests/e2e/test_historical_features.py | 16 +- tests/e2e/test_job_scheduling.py | 2 - tests/e2e/test_online_features.py | 61 ++----- tests/e2e/test_register.py | 87 +--------- tests/e2e/test_validation.py | 35 +--- tests/requirements.txt | 3 - 34 files changed, 29 insertions(+), 1920 deletions(-) delete mode 100755 infra/scripts/aws-runner.sh delete mode 100755 infra/scripts/azure-runner.sh delete mode 100755 infra/scripts/codebuild-entrypoint.sh delete mode 100755 infra/scripts/codebuild_runner.py delete mode 100755 infra/scripts/publish-java-sdk.sh delete mode 100755 infra/scripts/publish-python-sdk.sh delete mode 100755 infra/scripts/run-minikube-test.sh delete mode 100755 infra/scripts/setup-e2e-env-aws.sh delete mode 100755 infra/scripts/setup-e2e-env-gcp.sh delete mode 100644 infra/scripts/setup-e2e-local.sh delete mode 100755 infra/scripts/test-core-ingestion.sh delete mode 100755 infra/scripts/test-docker-compose.sh delete mode 100755 infra/scripts/test-end-to-end-local.sh delete mode 100755 infra/scripts/test-golang-sdk.sh delete mode 100755 infra/scripts/test-integration.sh delete mode 100755 infra/scripts/test-java-sdk.sh delete mode 100755 infra/scripts/test-load.sh delete mode 100755 infra/scripts/test-python-sdk.sh delete mode 100755 infra/scripts/test-serving.sh delete mode 100644 infra/scripts/test_job.yaml delete mode 100644 tests/e2e/fixtures/base.py delete mode 100644 tests/e2e/fixtures/feast_services.py delete mode 100644 tests/e2e/fixtures/services.py delete mode 100644 tests/e2e/fixtures/statsd_stub.py diff --git a/infra/scripts/aws-runner.sh b/infra/scripts/aws-runner.sh deleted file mode 100755 index 604c55ad..00000000 --- a/infra/scripts/aws-runner.sh +++ /dev/null @@ -1,10 +0,0 @@ -#!/usr/bin/env bash - -set -euo pipefail - -GIT_TAG=${PULL_PULL_SHA:-${PULL_BASE_SHA}} - -source infra/scripts/k8s-common-functions.sh -wait_for_image "${DOCKER_REPOSITORY}" feast-jobservice "${GIT_TAG}" - -infra/scripts/codebuild_runner.py "$@" \ No newline at end of file diff --git a/infra/scripts/azure-runner.sh b/infra/scripts/azure-runner.sh deleted file mode 100755 index e1e35be4..00000000 --- a/infra/scripts/azure-runner.sh +++ /dev/null @@ -1,76 +0,0 @@ -#!/usr/bin/env bash - -set -euo pipefail - - -STEP_BREADCRUMB='~~~~~~~~' -SECONDS=0 -TIMEFORMAT="${STEP_BREADCRUMB} took %R seconds" - -GIT_TAG=${PULL_PULL_SHA:-${PULL_BASE_SHA}} -GIT_REMOTE_URL=https://github.com/feast-dev/feast-spark.git - -echo "########## Starting e2e tests for ${GIT_REMOTE_URL} ${GIT_TAG} ###########" - -# Note requires running in root feast directory -source infra/scripts/k8s-common-functions.sh - -# Figure out docker image versions -export CI_GIT_TAG=latest - -# Jobservice is built by this repo -export JOBSERVICE_GIT_TAG=$GIT_TAG - -# Workaround for COPY command in core docker image that pulls local maven repo into the image -# itself. -mkdir .m2 2>/dev/null || true - -# Log into k8s. -echo "${STEP_BREADCRUMB} Updating kubeconfig" -az login --service-principal -u "$AZ_SERVICE_PRINCIPAL_ID" -p "$AZ_SERVICE_PRINCIPAL_PASS" --tenant "$AZ_SERVICE_PRINCIPAL_TENANT_ID" >/dev/null -az aks get-credentials --resource-group "$RESOURCE_GROUP" --name "$AKS_CLUSTER_NAME" - -# Sanity check that kubectl is working. -echo "${STEP_BREADCRUMB} k8s sanity check" -kubectl get pods - -# e2e test - runs in sparkop namespace for consistency with AWS sparkop test. -NAMESPACE=sparkop -RELEASE=sparkop - -# Delete old helm release and PVCs -k8s_cleanup "feast-release" "$NAMESPACE" -k8s_cleanup "$RELEASE" "$NAMESPACE" - -# Wait for CI and jobservice image to be built -wait_for_image "${DOCKER_REPOSITORY}" feast-ci "${CI_GIT_TAG}" -wait_for_image "${DOCKER_REPOSITORY}" feast-jobservice "${JOBSERVICE_GIT_TAG}" - -# Helm install everything in a namespace. Note that this function will use XXX_GIT_TAG variables -# we've set above to find the image versions. -helm_install "$RELEASE" "${DOCKER_REPOSITORY}" "${GIT_TAG}" "$NAMESPACE" \ - --set "feast-jobservice.envOverrides.FEAST_AZURE_BLOB_ACCOUNT_NAME=${AZURE_BLOB_ACCOUNT_NAME}" \ - --set "feast-jobservice.envOverrides.FEAST_AZURE_BLOB_ACCOUNT_ACCESS_KEY=${AZURE_BLOB_ACCOUNT_ACCESS_KEY}" - -# Delete old test running pod if it exists -kubectl delete pod -n "$NAMESPACE" ci-test-runner 2>/dev/null || true - -# Delete all sparkapplication resources that may be left over from the previous test runs. -kubectl delete sparkapplication --all -n "$NAMESPACE" || true - -# Make sure the test pod has permissions to create sparkapplication resources -setup_sparkop_role - -# Run the test suite as a one-off pod. -echo "${STEP_BREADCRUMB} Running the test suite" -kubectl run -n "$NAMESPACE" -i ci-test-runner \ - --pod-running-timeout=5m \ - --restart=Never \ - --image="${DOCKER_REPOSITORY}/feast-ci:${CI_GIT_TAG}" \ - --env="STAGING_PATH=${STAGING_PATH}" \ - --env="FEAST_AZURE_BLOB_ACCOUNT_NAME=${AZURE_BLOB_ACCOUNT_NAME}" \ - --env="FEAST_AZURE_BLOB_ACCOUNT_ACCESS_KEY=${AZURE_BLOB_ACCOUNT_ACCESS_KEY}" \ - -- \ - bash -c "mkdir src && cd src && git clone --recursive ${GIT_REMOTE_URL} && cd feast* && git config remote.origin.fetch '+refs/pull/*:refs/remotes/origin/pull/*' && git fetch -q && git checkout ${GIT_TAG} && git submodule update --init --recursive && ./infra/scripts/setup-e2e-env-sparkop.sh && ./infra/scripts/test-end-to-end-azure.sh" - -echo "########## e2e tests took $SECONDS seconds ###########" diff --git a/infra/scripts/codebuild-entrypoint.sh b/infra/scripts/codebuild-entrypoint.sh deleted file mode 100755 index 2f266783..00000000 --- a/infra/scripts/codebuild-entrypoint.sh +++ /dev/null @@ -1,143 +0,0 @@ -#!/bin/bash - -set -euo pipefail - -export DOCKER_REPOSITORY=gcr.io/kf-feast - -STEP_BREADCRUMB='~~~~~~~~' -SECONDS=0 -TIMEFORMAT="${STEP_BREADCRUMB} took %R seconds" - -source infra/scripts/k8s-common-functions.sh - -GIT_TAG=${CODEBUILD_RESOLVED_SOURCE_VERSION} - -echo "########## Starting stage $STAGE for ${CODEBUILD_SOURCE_REPO_URL} ${GIT_TAG} ###########" - -# This seems to make builds a bit faster. -export DOCKER_BUILDKIT=1 - -# Workaround for COPY command in core docker image that pulls local maven repo into the image -# itself. -mkdir .m2 2>/dev/null || true -mkdir deps/feast/.m2 2>/dev/null || true - -# Log into k8s. -echo "${STEP_BREADCRUMB} Updating kubeconfig" -aws eks update-kubeconfig --name "$EKS_CLUSTER_NAME" - -# chmod kubeconfig so it doesn't complain all the time -chmod 755 ~/.kube/config - -# Sanity check that kubectl is working. -echo "${STEP_BREADCRUMB} k8s sanity check" -kubectl get pods - -case $STAGE in - core-docker) - ;; - serving-docker) - ;; - jupyter-docker) - ;; - jobservice-docker) - ;; - ci-docker) - ;; - e2e-test-emr) - # EMR test - runs in default namespace. - - # Copy cluster config template generated for us by terraform. - aws s3 cp "${EMR_TEMPLATE_YML}" emr_cluster.yaml - - # Delete old helm release and PVCs - helm uninstall cicd -n default || true - k8s_cleanup "feast-release" default - - # Create cluster OR get existing EMR cluster id. In the latter case, clean up any steps - # already running there from previous test runs. - echo "${STEP_BREADCRUMB} Creating EMR cluster, this can take up 10 minutes." - CLUSTER_ID=$(time emr_cluster.py --template emr_cluster.yaml ensure --cleanup) - - # Get (any) node IP. EMR will use this to connect to Kafka and Redis. We make them - # available to the EMR job by exposing them as NodePort services. - NODE_IP=$(kubectl get nodes -o custom-columns=Name:.metadata.name | tail -n1) - - # Helm install everything. - # - # This may occasionally run into "provided port is already allocated" error due to - # https://github.com/kubernetes/kubernetes/issues/85894 - helm_install cicd "$DOCKER_REPOSITORY" "$GIT_TAG" default \ - --set "redis.master.service.type=NodePort" \ - --set "redis.master.service.nodePort=32379" \ - --set "kafka.externalAccess.service.type=NodePort" \ - --set "kafka.externalAccess.enabled=true" \ - --set "kafka.externalAccess.service.nodePorts[0]=30092" \ - --set "kafka.externalAccess.service.domain=${NODE_IP}" \ - --set "kafka.service.externalPort=30094" - - # Run the test suite as a one-off pod. We could also run it here, in the codebuild container - # itself, but that'd require more networking setup to make feast services available - # outside k8s cluster. - kubectl delete pod ci-test-runner 2>/dev/null || true - - echo "${STEP_BREADCRUMB} Running the test suite" - time kubectl run --rm -i ci-test-runner \ - --restart=Never \ - --image="${DOCKER_REPOSITORY}/feast-ci:latest" \ - --env="CLUSTER_ID=$CLUSTER_ID" \ - --env="STAGING_PATH=$STAGING_PATH" \ - --env="NODE_IP=$NODE_IP" \ - -- \ - bash -c "mkdir src && cd src && git clone --recursive $CODEBUILD_SOURCE_REPO_URL && cd feast* && git config remote.origin.fetch '+refs/pull/*:refs/remotes/origin/pull/*' && git fetch -q && git checkout $CODEBUILD_RESOLVED_SOURCE_VERSION && ./infra/scripts/setup-e2e-env-aws.sh && ./infra/scripts/test-end-to-end-aws.sh" - - ;; - e2e-test-sparkop) - # spark k8s test - runs in sparkop namespace (so it doesn't interfere with a concurrently - # running EMR test). - NAMESPACE=sparkop - RELEASE=sparkop - - # Clean up old release - helm uninstall $RELEASE -n $NAMESPACE || true - k8s_cleanup "feast-release" "$NAMESPACE" - - # Helm install everything in a namespace - helm_install "$RELEASE" "${DOCKER_REPOSITORY}" "${GIT_TAG}" "$NAMESPACE" - - # Delete old test running pod if it exists - kubectl delete pod -n "$NAMESPACE" ci-test-runner 2>/dev/null || true - - # Delete all sparkapplication resources that may be left over from the previous test runs. - kubectl delete sparkapplication --all -n "$NAMESPACE" || true - - # Make sure the test pod has permissions to create sparkapplication resources - setup_sparkop_role - - # Run the test suite as a one-off pod. - echo "${STEP_BREADCRUMB} Running the test suite" - if ! time kubectl run --rm -n "$NAMESPACE" -i ci-test-runner \ - --restart=Never \ - --image="${DOCKER_REPOSITORY}/feast-ci:latest" \ - --env="STAGING_PATH=$STAGING_PATH" \ - -- \ - bash -c "mkdir src && cd src && git clone --recursive $CODEBUILD_SOURCE_REPO_URL && cd feast* && git config remote.origin.fetch '+refs/pull/*:refs/remotes/origin/pull/*' && git fetch -q && git checkout $CODEBUILD_RESOLVED_SOURCE_VERSION && ./infra/scripts/setup-e2e-env-sparkop.sh && ./infra/scripts/test-end-to-end-sparkop.sh" ; then - - readarray -t CRASHED_PODS < <(kubectl get pods --no-headers=true --namespace sparkop | grep Error | awk '{ print $1 }') - - for POD in "${CRASHED_PODS[@]}"; do - echo "Logs from crashed pod $POD:" - kubectl logs --namespace sparkop "$POD" - done - fi - - ;; - cleanup) - emr_cluster.py --template emr_cluster.yaml destroy - ;; - *) - echo "Unknown stage $STAGE" - ;; -esac - -echo "########## Stage $STAGE took $SECONDS seconds ###########" diff --git a/infra/scripts/codebuild_runner.py b/infra/scripts/codebuild_runner.py deleted file mode 100755 index 382ce949..00000000 --- a/infra/scripts/codebuild_runner.py +++ /dev/null @@ -1,200 +0,0 @@ -#!/usr/bin/env python - -# This is a thin wrapper for AWS Codebuild API to kick off a build, wait for it to finish, -# and tail build logs while it is running. - -import os -import json -from typing import Dict, Any, List, Optional, AsyncGenerator -from datetime import datetime -import asyncio -import sys -import argparse -import boto3 -from botocore.config import Config - - -class LogTailer: - """ A simple cloudwatch log tailer. """ - - _next_token: Optional[str] - - def __init__(self, client, log_group: str, log_stream: str): - self._client = client - self._next_token = None - self._log_group = log_group - self._log_stream = log_stream - - def _get_log_events_args(self) -> Dict[str, Any]: - res = dict( - logGroupName=self._log_group, - logStreamName=self._log_stream, - limit=100, - startFromHead=True, - ) - if self._next_token: - res["nextToken"] = self._next_token - return res - - async def tail_chunk(self) -> List[Dict[str, str]]: - max_sleep = 5.0 - SLEEP_TIME = 0.5 - - while max_sleep > 0: - resp = self._client.get_log_events(**self._get_log_events_args()) - events = resp["events"] - self._next_token = resp.get("nextForwardToken") - if events: - return events - else: - max_sleep -= SLEEP_TIME - await asyncio.sleep(SLEEP_TIME) - else: - return [] - - async def read_all_chunks(self) -> AsyncGenerator[List[Dict[str, str]], None]: - while True: - resp = self._client.get_log_events(**self._get_log_events_args()) - events = resp["events"] - self._next_token = resp.get("nextForwardToken") - if events: - yield events - else: - return - - -async def _wait_build_state( - client, build_id, desired_phase: Optional[str], desired_states: List[str] -) -> Dict[str, Any]: - """ Wait until the build is in one of the desired states, or in the desired phase. """ - while True: - resp = client.batch_get_builds(ids=[build_id]) - assert len(resp["builds"]) == 1 - build = resp["builds"][0] - if build["buildStatus"] in desired_states: - return build - for phase in build["phases"]: - if desired_phase and (phase["phaseType"] == desired_phase): - return build - - await asyncio.sleep(2) - - -def print_log_event(event) -> None: - print( - str(datetime.fromtimestamp(event["timestamp"] / 1000.0)), - event["message"], - end="", - ) - - -async def main() -> None: - parser = argparse.ArgumentParser(description="Process some integers.") - parser.add_argument( - "--project-name", default="feast-ci-project", type=str, help="Project name" - ) - parser.add_argument( - "--source-location", - type=str, - help="Source location, e.g. https://github.com/feast/feast.git", - ) - parser.add_argument( - "--source-version", type=str, help="Source version, e.g. master" - ) - parser.add_argument( - "--location-from-prow", action='store_true', help="Infer source location and version from prow environment variables" - ) - args = parser.parse_args() - - if args.location_from_prow: - job_spec = json.loads(os.getenv('JOB_SPEC', '')) - source_location = job_spec['refs']['repo_link'] - source_version = source_version_from_prow_job_spec(job_spec) - else: - source_location = args.source_location - source_version = args.source_version - - await run_build( - project_name=args.project_name, - source_location=source_location, - source_version=source_version, - ) - -def source_version_from_prow_job_spec(job_spec: Dict[str, Any]) -> str: - pull = job_spec['refs']['pulls'][0] - return f'refs/pull/{pull["number"]}/head^{{{pull["sha"]}}}' - -async def run_build(project_name: str, source_version: str, source_location: str): - print(f"Building {project_name} at {source_version}", file=sys.stderr) - - config = Config( - retries = { - 'max_attempts': 10, - } - ) - - logs_client = boto3.client("logs", region_name="us-west-2", config=config) - codebuild_client = boto3.client("codebuild", region_name="us-west-2") - - print("Submitting the build..", file=sys.stderr) - build_resp = codebuild_client.start_build( - projectName=project_name, - sourceLocationOverride=source_location, - sourceVersion=source_version, - ) - - build_id = build_resp["build"]["id"] - - try: - print( - "Waiting for the INSTALL phase to start before tailing the log", - file=sys.stderr, - ) - build = await _wait_build_state( - codebuild_client, - build_id, - desired_phase="INSTALL", - desired_states=["SUCCEEDED", "FAILED", "STOPPED", "TIMED_OUT", "FAULT"], - ) - - if build["buildStatus"] != "IN_PROGRESS": - print( - f"Build failed before install phase: {build['buildStatus']}", - file=sys.stderr, - ) - sys.exit(1) - - log_tailer = LogTailer( - logs_client, - log_stream=build["logs"]["streamName"], - log_group=build["logs"]["groupName"], - ) - - waiter_task = asyncio.get_event_loop().create_task( - _wait_build_state( - codebuild_client, - build_id, - desired_phase=None, - desired_states=["SUCCEEDED", "FAILED", "STOPPED", "TIMED_OUT", "FAULT"], - ) - ) - - while not waiter_task.done(): - events = await log_tailer.tail_chunk() - for event in events: - print_log_event(event) - - build_status = waiter_task.result()["buildStatus"] - if build_status == "SUCCEEDED": - print(f"Build {build_status}", file=sys.stderr) - else: - print(f"Build {build_status}", file=sys.stderr) - sys.exit(1) - except KeyboardInterrupt: - print(f"Stopping build {build_id}", file=sys.stderr) - codebuild_client.stop_build(id=build_id) - - -if __name__ == "__main__": - loop = asyncio.get_event_loop() - loop.run_until_complete(main()) diff --git a/infra/scripts/publish-java-sdk.sh b/infra/scripts/publish-java-sdk.sh deleted file mode 100755 index 91123c8d..00000000 --- a/infra/scripts/publish-java-sdk.sh +++ /dev/null @@ -1,72 +0,0 @@ -#!/usr/bin/env bash - -set -e -set -o pipefail - -GPG_KEY_IMPORT_DIR=/etc/gpg - -usage() -{ - echo "usage: publish-java-sdk.sh - - --revision Value for the revision e.g. '0.2.3' - --gpg-key-import-dir Directory containing existing GPG keys to import. - The directory should contain these 2 files: - - public-key - - private-key - The default value is '/etc/gpg' - - This script assumes the GPG private key is protected by a passphrase. - The passphrase can be specified in \$HOME/.m2/settings.xml. In the same xml - file, credentials to upload releases to Sonatype must also be provided. - - # Example settings: ~/.m2/settings.xml - - - - ossrh - SONATYPE_USER - SONATYPE_PASSWORD - - - - - ossrh - - GPG_PASSPHRASE - - - - -" -} - -while [ "$1" != "" ]; do - case "$1" in - --revision ) REVISION="$2"; shift;; - --gpg-key-import-dir ) GPG_KEY_IMPORT_DIR="$2"; shift;; - -h | --help ) usage; exit;; - * ) usage; exit 1 - esac - shift -done - -if [ -z $REVISION ]; then usage; exit 1; fi - -echo "============================================================" -echo "Checking Maven and GPG versions" -echo "============================================================" -mvn --version -echo "" -gpg --version - -echo "============================================================" -echo "Importing GPG keys" -echo "============================================================" -gpg --import --batch --yes $GPG_KEY_IMPORT_DIR/public-key -gpg --import --batch --yes $GPG_KEY_IMPORT_DIR/private-key - -echo "============================================================" -echo "Deploying Java SDK with revision: $REVISION" -echo "============================================================" -mvn --projects datatypes/java,sdk/java -Drevision=$REVISION --batch-mode clean deploy diff --git a/infra/scripts/publish-python-sdk.sh b/infra/scripts/publish-python-sdk.sh deleted file mode 100755 index 582d9072..00000000 --- a/infra/scripts/publish-python-sdk.sh +++ /dev/null @@ -1,47 +0,0 @@ -#!/usr/bin/env bash - -set -e -set -o pipefail - -usage() -{ - echo "usage: publish-python-sdk.sh - - --directory-path absolute path to the python package, this directory - should contain 'setup.py' file - - --repository the repository name where the package will be uploaded, - check your .pypirc configuration file for the list of - valid repositories, usually it's 'pypi' or 'testpypi' -" -} - -while [ "$1" != "" ]; do - case "$1" in - --directory-path ) DIRECTORY_PATH="$2"; shift;; - --repository ) REPOSITORY="$2"; shift;; - -h | --help ) usage; exit;; - * ) usage; exit 1 - esac - shift -done - -if [ -z $DIRECTORY_PATH ]; then usage; exit 1; fi -if [ -z $REPOSITORY ]; then usage; exit 1; fi - -ORIGINAL_DIR=$PWD -cd $DIRECTORY_PATH - -echo "============================================================" -echo "Generating distribution archives" -echo "============================================================" -python3 -m pip install --user --upgrade setuptools wheel -python3 setup.py sdist bdist_wheel - -echo "============================================================" -echo "Uploading distribution archives" -echo "============================================================" -python3 -m pip install --user --upgrade twine -python3 -m twine upload --repository $REPOSITORY dist/* - -cd $ORIGINAL_DIR diff --git a/infra/scripts/run-minikube-test.sh b/infra/scripts/run-minikube-test.sh deleted file mode 100755 index 4c9a1344..00000000 --- a/infra/scripts/run-minikube-test.sh +++ /dev/null @@ -1,38 +0,0 @@ -#!/bin/bash - -set -euo pipefail - -NAMESPACE=sparkop -JOB_NAME=test-runner - -# Delete all sparkapplication resources that may be left over from the previous test runs. -kubectl delete sparkapplication --all -n sparkop || true - -JOB_SPEC=$(dirname $0)/test_job.yaml - -# Delete previous instance of the job if it exists -kubectl delete -n ${NAMESPACE} "job/$JOB_NAME" 2>/dev/null || true - -# Create the job -kubectl apply -n ${NAMESPACE} -f "$JOB_SPEC" - -# Wait for job to have a pod. -for i in {1..10} -do - POD=$(kubectl get pods -n ${NAMESPACE} --selector=job-name=$JOB_NAME --output=jsonpath='{.items[0].metadata.name}') - if [ ! -z "$POD" ]; then - break - else - sleep 1 - fi -done - -echo "Waiting for pod to be ready:" -kubectl wait -n ${NAMESPACE} --for=condition=ContainersReady "pod/$POD" --timeout=60s || true - -echo "Job output:" -kubectl logs -n ${NAMESPACE} -f "job/$JOB_NAME" - -# Can't wait for both conditions at once, so wait for complete first then wait for failure -kubectl wait -n ${NAMESPACE} --for=condition=complete "job/$JOB_NAME" --timeout=60s && exit 0 -kubectl wait -n ${NAMESPACE} --for=condition=failure "job/$JOB_NAME" --timeout=60s && exit 1 diff --git a/infra/scripts/setup-e2e-env-aws.sh b/infra/scripts/setup-e2e-env-aws.sh deleted file mode 100755 index 2ab15494..00000000 --- a/infra/scripts/setup-e2e-env-aws.sh +++ /dev/null @@ -1,14 +0,0 @@ -#!/bin/bash - -python -m pip install --upgrade pip==20.2 setuptools wheel - -make install-python - -python -m pip install -qr tests/requirements.txt - -# Using mvn -q to make it less verbose. This step happens after docker containers were -# succesfully built so it should be unlikely to fail, therefore we likely won't need detailed logs. -echo "########## Building ingestion jar" -TIMEFORMAT='########## took %R seconds' - -time make build-ingestion-jar-no-tests REVISION=develop MAVEN_EXTRA_OPTS="-q --no-transfer-progress" diff --git a/infra/scripts/setup-e2e-env-gcp.sh b/infra/scripts/setup-e2e-env-gcp.sh deleted file mode 100755 index 08083d0c..00000000 --- a/infra/scripts/setup-e2e-env-gcp.sh +++ /dev/null @@ -1,36 +0,0 @@ -#!/usr/bin/env bash - -# GCloud, kubectl, helm should be already installed -# And kubernetes cluster already configured - -test -z ${GCLOUD_REGION} && GCLOUD_REGION="us-central1" -test -z ${GCLOUD_NETWORK} && GCLOUD_NETWORK="default" -test -z ${GCLOUD_SUBNET} && GCLOUD_SUBNET="default" - - -feast_kafka_ip_name="feast-kafka" -feast_redis_1_ip_name="feast-redis-1" -feast_redis_2_ip_name="feast-redis-2" -feast_redis_3_ip_name="feast-redis-3" - -helm repo add bitnami https://charts.bitnami.com/bitnami - -gcloud compute addresses create \ - $feast_kafka_ip_name $feast_redis_1_ip_name $feast_redis_2_ip_name $feast_redis_3_ip_name \ - --region ${GCLOUD_REGION} --subnet ${GCLOUD_SUBNET} - -export feast_kafka_ip=$(gcloud compute addresses describe $feast_kafka_ip_name --region=${GCLOUD_REGION} --format "value(address)") -export feast_redis_1_ip=$(gcloud compute addresses describe $feast_redis_1_ip_name --region=${GCLOUD_REGION} --format "value(address)") -export feast_redis_2_ip=$(gcloud compute addresses describe $feast_redis_2_ip_name --region=${GCLOUD_REGION} --format "value(address)") -export feast_redis_3_ip=$(gcloud compute addresses describe $feast_redis_3_ip_name --region=${GCLOUD_REGION} --format "value(address)") - - -envsubst '$feast_kafka_ip' < helm/kafka-values.tpl.yaml > helm/kafka-values.yaml -envsubst '$feast_redis_1_ip,$feast_redis_2_ip,$feast_redis_3_ip' < helm/redis-cluster-values.tpl.yaml > helm/redis-cluster-values.yaml - -helm install e2e-kafka bitnami/kafka \ - --values helm/kafka-values.yaml --namespace infra --create-namespace - -helm install e2e-redis-cluster bitnami/redis-cluster \ - --values helm/redis-cluster-values.yaml --namespace infra \ - --create-namespace \ No newline at end of file diff --git a/infra/scripts/setup-e2e-local.sh b/infra/scripts/setup-e2e-local.sh deleted file mode 100644 index 3432673d..00000000 --- a/infra/scripts/setup-e2e-local.sh +++ /dev/null @@ -1,26 +0,0 @@ -#!/bin/bash -set -euo pipefail - -STEP_BREADCRUMB='~~~~~~~~' - -pushd "$(dirname $0)" -source k8s-common-functions.sh - -# spark k8s test - runs in sparkop namespace (so it doesn't interfere with a concurrently -# running EMR test). -NAMESPACE=sparkop -RELEASE=sparkop - -# Clean up old release -k8s_cleanup "$RELEASE" "$NAMESPACE" - -# Helm install everything in a namespace -helm_install "$RELEASE" "${DOCKER_REPOSITORY}" "${GIT_TAG}" "$NAMESPACE" --create-namespace - -# Delete all sparkapplication resources that may be left over from the previous test runs. -kubectl delete sparkapplication --all -n "$NAMESPACE" || true - -# Make sure the test pod has permissions to create sparkapplication resources -setup_sparkop_role - -echo "DONE" \ No newline at end of file diff --git a/infra/scripts/test-core-ingestion.sh b/infra/scripts/test-core-ingestion.sh deleted file mode 100755 index d3b42926..00000000 --- a/infra/scripts/test-core-ingestion.sh +++ /dev/null @@ -1,25 +0,0 @@ -#!/usr/bin/env bash - -apt-get -qq update -apt-get -y install build-essential - -make lint-java - -infra/scripts/download-maven-cache.sh \ - --archive-uri gs://feast-templocation-kf-feast/.m2.2019-10-24.tar \ - --output-dir /root/ - -# Core depends on Ingestion so they are tested together -# Skip Maven enforcer: https://stackoverflow.com/questions/50647223/maven-enforcer-issue-when-running-from-reactor-level -mvn --projects core,ingestion --batch-mode --define skipTests=true \ - --define enforcer.skip=true clean install -mvn --projects core,ingestion --define enforcer.skip=true test -TEST_EXIT_CODE=$? - -# Default artifact location setting in Prow jobs -LOGS_ARTIFACT_PATH=/logs/artifacts -mkdir -p ${LOGS_ARTIFACT_PATH}/surefire-reports -cp core/target/surefire-reports/* ${LOGS_ARTIFACT_PATH}/surefire-reports/ -cp ingestion/target/surefire-reports/* ${LOGS_ARTIFACT_PATH}/surefire-reports/ - -exit ${TEST_EXIT_CODE} \ No newline at end of file diff --git a/infra/scripts/test-docker-compose.sh b/infra/scripts/test-docker-compose.sh deleted file mode 100755 index e2454f9f..00000000 --- a/infra/scripts/test-docker-compose.sh +++ /dev/null @@ -1,73 +0,0 @@ -#!/usr/bin/env bash - -set -e - -echo " -============================================================ -Running Docker Compose tests with pytest at 'tests/e2e' -============================================================ -" -LATEST_GH_COMMIT_SHA=$1 - -clean_up () { - ARG=$? - - # Shut down docker-compose images - - docker-compose down - - exit $ARG -} - -trap clean_up EXIT - -export PROJECT_ROOT_DIR=$(git rev-parse --show-toplevel) -export COMPOSE_INTERACTIVE_NO_CLI=1 - -# Create Docker Compose configuration file -cd ${PROJECT_ROOT_DIR}/infra/docker-compose/ -cp .env.sample .env - -# Replace FEAST_VERSION with latest github image SHA -export FEAST_VERSION=$LATEST_GH_COMMIT_SHA -echo "Testing docker-compose setup with version SHA, $FEAST_VERSION." - -# Start Docker Compose containers -docker-compose up -d - -# Get Jupyter container IP address -export JUPYTER_DOCKER_CONTAINER_IP_ADDRESS=$(docker inspect -f '{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}' feast_jupyter_1) - -# Print Jupyter container information -docker inspect feast_jupyter_1 - -# Wait for Jupyter Notebook Container to come online -${PROJECT_ROOT_DIR}/infra/scripts/wait-for-it.sh ${JUPYTER_DOCKER_CONTAINER_IP_ADDRESS}:8888 --timeout=60 - -# Get Feast Core container IP address -export FEAST_CORE_CONTAINER_IP_ADDRESS=$(docker inspect -f '{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}' feast_core_1) - -# Wait for Feast Core to be ready -${PROJECT_ROOT_DIR}/infra/scripts/wait-for-it.sh ${FEAST_CORE_CONTAINER_IP_ADDRESS}:6565 --timeout=120 - -# Get Feast Online Serving container IP address -export FEAST_ONLINE_SERVING_CONTAINER_IP_ADDRESS=$(docker inspect -f '{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}' feast_online_serving_1) - -# Wait for Feast Online Serving to be ready -${PROJECT_ROOT_DIR}/infra/scripts/wait-for-it.sh ${FEAST_ONLINE_SERVING_CONTAINER_IP_ADDRESS}:6566 --timeout=120 - - -# Get Feast Job Service container IP address -export FEAST_JOB_SERVICE_CONTAINER_IP_ADDRESS=$(docker inspect -f '{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}' feast_jobservice_1) - -# Wait for Feast Job Service to be ready -${PROJECT_ROOT_DIR}/infra/scripts/wait-for-it.sh ${FEAST_JOB_SERVICE_CONTAINER_IP_ADDRESS}:6568 --timeout=120 - -# Run e2e tests for Redis -docker exec \ - -e FEAST_VERSION=${FEAST_VERSION} \ - -e DISABLE_SERVICE_FIXTURES=true \ - -e DISABLE_FEAST_SERVICE_FIXTURES=true \ - --user root \ - feast_jupyter_1 bash \ - -c 'cd /feast/tests && python -m pip install -r requirements.txt && pytest e2e/ --ingestion-jar https://storage.googleapis.com/feast-jobs/spark/ingestion/feast-ingestion-spark-${FEAST_VERSION}.jar --redis-url redis:6379 --core-url core:6565 --serving-url online_serving:6566 --job-service-url jobservice:6568 --staging-path file:///shared/staging/ --kafka-brokers kafka:9092 --statsd-url prometheus_statsd:9125 --prometheus-url prometheus_statsd:9102 --feast-version develop' diff --git a/infra/scripts/test-end-to-end-local.sh b/infra/scripts/test-end-to-end-local.sh deleted file mode 100755 index cf57d8a2..00000000 --- a/infra/scripts/test-end-to-end-local.sh +++ /dev/null @@ -1,69 +0,0 @@ -#!/usr/bin/env bash - -set -euo pipefail - -export DISABLE_FEAST_SERVICE_FIXTURES=1 -export DISABLE_SERVICE_FIXTURES=1 - -export FEAST_SPARK_K8S_NAMESPACE=sparkop -export FEAST_S3_ENDPOINT_URL=http://minio.minio.svc.cluster.local:9000 - -# Used by tests -export AWS_S3_ENDPOINT_URL=http://minio.minio.svc.cluster.local:9000 - -cat << SPARK_CONF_END >/tmp/spark_conf.yml -apiVersion: "sparkoperator.k8s.io/v1beta2" -kind: SparkApplication -metadata: - namespace: default -spec: - type: Scala - mode: cluster - image: "gcr.io/kf-feast/spark-py:v3.0.1" - imagePullPolicy: Always - sparkVersion: "3.0.1" - timeToLiveSeconds: 3600 - pythonVersion: "3" - sparkConf: - "spark.hadoop.fs.s3a.endpoint": http://minio.minio.svc.cluster.local:9000 - "spark.hadoop.fs.s3a.path.style.access": "true" - "spark.hadoop.fs.s3a.access.key": ${AWS_ACCESS_KEY_ID} - "spark.hadoop.fs.s3a.secret.key": ${AWS_SECRET_ACCESS_KEY} - restartPolicy: - type: Never - volumes: - - name: "test-volume" - hostPath: - path: "/tmp" - type: Directory - driver: - cores: 1 - coreLimit: "1200m" - memory: "512m" - labels: - version: 3.0.1 - serviceAccount: spark - volumeMounts: - - name: "test-volume" - mountPath: "/tmp" - executor: - cores: 1 - instances: 1 - memory: "512m" - labels: - version: 3.0.1 - volumeMounts: - - name: "test-volume" - mountPath: "/tmp" -SPARK_CONF_END -export FEAST_SPARK_K8S_JOB_TEMPLATE_PATH=/tmp/spark_conf.yml - -PYTHONPATH=sdk/python pytest tests/e2e/ \ - --feast-version develop \ - --core-url sparkop-feast-core:6565 \ - --serving-url sparkop-feast-online-serving:6566 \ - --env k8s \ - --staging-path s3a://feast-staging \ - --redis-url sparkop-redis-master.sparkop.svc.cluster.local:6379 \ - --kafka-brokers sparkop-kafka.sparkop.svc.cluster.local:9092 \ - -m "not bq and not k8s" \ No newline at end of file diff --git a/infra/scripts/test-end-to-end-sparkop.sh b/infra/scripts/test-end-to-end-sparkop.sh index 0c6f7637..68edf58b 100755 --- a/infra/scripts/test-end-to-end-sparkop.sh +++ b/infra/scripts/test-end-to-end-sparkop.sh @@ -65,6 +65,4 @@ kubectl run -n "$NAMESPACE" -i ci-test-runner \ --restart=Never \ --image="python:3.8" \ --env="FEAST_TELEMETRY=false" \ - --env="DISABLE_FEAST_SERVICE_FIXTURES=1" \ - --env="DISABLE_SERVICE_FIXTURES=1" \ -- bash -c "$CMD" diff --git a/infra/scripts/test-golang-sdk.sh b/infra/scripts/test-golang-sdk.sh deleted file mode 100755 index 666f6c12..00000000 --- a/infra/scripts/test-golang-sdk.sh +++ /dev/null @@ -1,17 +0,0 @@ -#!/usr/bin/env bash - -set -o pipefail - -make lint-go - -cd sdk/go -go test -v 2>&1 | tee /tmp/test_output -TEST_EXIT_CODE=$? - -# Default artifact location setting in Prow jobs -LOGS_ARTIFACT_PATH=/logs/artifacts - -go get -u github.com/jstemmer/go-junit-report -cat /tmp/test_output | ${GOPATH}/bin/go-junit-report > ${LOGS_ARTIFACT_PATH}/golang-sdk-test-report.xml - -exit ${TEST_EXIT_CODE} \ No newline at end of file diff --git a/infra/scripts/test-integration.sh b/infra/scripts/test-integration.sh deleted file mode 100755 index ad5dd29a..00000000 --- a/infra/scripts/test-integration.sh +++ /dev/null @@ -1,8 +0,0 @@ -#!/usr/bin/env bash - -python -m pip install --upgrade pip setuptools wheel -make install-python -python -m pip install -qr tests/requirements.txt - -export FEAST_TELEMETRY="False" -pytest tests/integration --dataproc-cluster-name feast-e2e --dataproc-project kf-feast --dataproc-region us-central1 --dataproc-staging-location gs://feast-templocation-kf-feast diff --git a/infra/scripts/test-java-sdk.sh b/infra/scripts/test-java-sdk.sh deleted file mode 100755 index 0731b779..00000000 --- a/infra/scripts/test-java-sdk.sh +++ /dev/null @@ -1,13 +0,0 @@ -#!/usr/bin/env bash - -# Skip Maven enforcer: https://stackoverflow.com/questions/50647223/maven-enforcer-issue-when-running-from-reactor-level -mvn --projects sdk/java --batch-mode --define skipTests=true \ - --define enforcer.skip=true clean install -mvn --projects sdk/java --define enforcer.skip=true test -TEST_EXIT_CODE=$? - -# Default artifact location setting in Prow jobs -LOGS_ARTIFACT_PATH=/logs/artifacts -cp -r sdk/java/target/surefire-reports ${LOGS_ARTIFACT_PATH}/surefire-reports - -exit ${TEST_EXIT_CODE} \ No newline at end of file diff --git a/infra/scripts/test-load.sh b/infra/scripts/test-load.sh deleted file mode 100755 index 51a24691..00000000 --- a/infra/scripts/test-load.sh +++ /dev/null @@ -1,116 +0,0 @@ -#!/usr/bin/env bash - -set -e - -echo " -============================================================ -Running Load Tests -============================================================ -" - -clean_up() { - ARG=$? - - # Shut down docker-compose images - cd "${PROJECT_ROOT_DIR}"/infra/docker-compose - - docker-compose down - - exit $ARG -} - -# Get Feast project repository root and scripts directory -export PROJECT_ROOT_DIR=$(git rev-parse --show-toplevel) -export SCRIPTS_DIR=${PROJECT_ROOT_DIR}/infra/scripts -export COMPOSE_INTERACTIVE_NO_CLI=1 -source ${SCRIPTS_DIR}/setup-common-functions.sh - -if [ -z "$1" ] ; then - echo "No SHA/FEAST_VERSION provided as argument, using local HEAD"; - FEAST_VERSION=$(git rev-parse HEAD); - export FEAST_VERSION -else - echo "Using ${1} as SHA/FEAST_VERSION to test"; - FEAST_VERSION=${1} - export FEAST_VERSION -fi - -wait_for_docker_image gcr.io/kf-feast/feast-core:"${FEAST_VERSION}" -wait_for_docker_image gcr.io/kf-feast/feast-jobcontroller:"${FEAST_VERSION}" -wait_for_docker_image gcr.io/kf-feast/feast-serving:"${FEAST_VERSION}" -wait_for_docker_image gcr.io/kf-feast/feast-jupyter:"${FEAST_VERSION}" - -# Clean up Docker Compose if failure -trap clean_up EXIT - -# Create Docker Compose configuration file -cd "${PROJECT_ROOT_DIR}"/infra/docker-compose/ -cp .env.sample .env - -# Start Docker Compose containers -FEAST_VERSION=${FEAST_VERSION} docker-compose up -d - -# Get Jupyter container IP address -export JUPYTER_DOCKER_CONTAINER_IP_ADDRESS=$(docker inspect -f '{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}' feast_jupyter_1) - -# Print Jupyter container information -docker inspect feast_jupyter_1 -docker logs feast_jupyter_1 - -# Wait for Jupyter Notebook Container to come online -"${PROJECT_ROOT_DIR}"/infra/scripts/wait-for-it.sh ${JUPYTER_DOCKER_CONTAINER_IP_ADDRESS}:8888 --timeout=60 - -# Get Feast Core container IP address -export FEAST_CORE_CONTAINER_IP_ADDRESS=$(docker inspect -f '{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}' feast_core_1) - -# Wait for Feast Core to be ready -"${PROJECT_ROOT_DIR}"/infra/scripts/wait-for-it.sh ${FEAST_CORE_CONTAINER_IP_ADDRESS}:6565 --timeout=120 - -# Get Feast Job Controller container IP address -export FEAST_JOB_CONTROLLER_CONTAINER_IP_ADDRESS=$(docker inspect -f '{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}' feast_jobcontroller_1) - -# Wait for Feast Job Controller to be ready -"${PROJECT_ROOT_DIR}"/infra/scripts/wait-for-it.sh ${FEAST_JOB_CONTROLLER_CONTAINER_IP_ADDRESS}:6570 --timeout=120 - -# Get Feast Online Serving container IP address -export FEAST_ONLINE_SERVING_CONTAINER_IP_ADDRESS=$(docker inspect -f '{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}' feast_online_serving_1) - -# Wait for Feast Online Serving to be ready -"${PROJECT_ROOT_DIR}"/infra/scripts/wait-for-it.sh ${FEAST_ONLINE_SERVING_CONTAINER_IP_ADDRESS}:6566 --timeout=120 - -# Ingest data into Feast -pip install --user matplotlib feast pytz --upgrade -python "${PROJECT_ROOT_DIR}"/tests/load/ingest.py "${FEAST_CORE_CONTAINER_IP_ADDRESS}":6565 "${FEAST_ONLINE_SERVING_CONTAINER_IP_ADDRESS}":6566 - -# Download load test tool and proxy -cd $(mktemp -d) -wget -c https://github.com/feast-dev/feast-load-test-proxy/releases/download/v0.1.1/feast-load-test-proxy_0.1.1_Linux_x86_64.tar.gz -O - | tar -xz -git clone https://github.com/giltene/wrk2.git -cd wrk2 -make -cd .. -cp wrk2/wrk . - -# Start load test server -LOAD_FEAST_SERVING_HOST=${FEAST_ONLINE_SERVING_CONTAINER_IP_ADDRESS} LOAD_FEAST_SERVING_PORT=6566 ./feast-load-test-proxy & -sleep 5 - -# Run load tests -./wrk -t2 -c10 -d30s -R20 --latency http://localhost:8080/echo -./wrk -t2 -c10 -d30s -R20 --latency http://localhost:8080/send?entity_count=10 > load_test_results_1fs_13f_10e_20rps -./wrk -t2 -c10 -d30s -R50 --latency http://localhost:8080/send?entity_count=10 > load_test_results_1fs_13f_10e_50rps -./wrk -t2 -c10 -d30s -R250 --latency http://localhost:8080/send?entity_count=10 > load_test_results_1fs_13f_10e_250rps -./wrk -t2 -c10 -d30s -R20 --latency http://localhost:8080/send?entity_count=50 > load_test_results_1fs_13f_50e_20rps -./wrk -t2 -c10 -d30s -R50 --latency http://localhost:8080/send?entity_count=50 > load_test_results_1fs_13f_50e_50rps -./wrk -t2 -c10 -d30s -R250 --latency http://localhost:8080/send?entity_count=50 > load_test_results_1fs_13f_50e_250rps - -# Print load test results -cat $(ls -lah | grep load_test_results | awk '{print $9}' | tr '\n' ' ') - -# Create hdr-plot of load tests -export PLOT_FILE_NAME="load_test_graph_${FEAST_VERSION}"_$(date "+%Y%m%d-%H%M%S").png -python $PROJECT_ROOT_DIR/tests/load/hdr_plot.py --output "$PLOT_FILE_NAME" --title "Load test: ${FEAST_VERSION}" $(ls -lah | grep load_test_results | awk '{print $9}' | tr '\n' ' ') - -# Persist artifact -mkdir -p "${PROJECT_ROOT_DIR}"/load-test-output/ -cp -r load_test_* "${PROJECT_ROOT_DIR}"/load-test-output/ \ No newline at end of file diff --git a/infra/scripts/test-python-sdk.sh b/infra/scripts/test-python-sdk.sh deleted file mode 100755 index 7c264eed..00000000 --- a/infra/scripts/test-python-sdk.sh +++ /dev/null @@ -1,14 +0,0 @@ -#!/usr/bin/env bash - -set -e - -# Default artifact location setting in Prow jobs -LOGS_ARTIFACT_PATH=/logs/artifacts - -pip install -r sdk/python/requirements-ci.txt -make compile-protos-python -make lint-python - -cd sdk/python/ -pip install -e . -pytest --junitxml=${LOGS_ARTIFACT_PATH}/python-sdk-test-report.xml diff --git a/infra/scripts/test-serving.sh b/infra/scripts/test-serving.sh deleted file mode 100755 index ce9dc0a8..00000000 --- a/infra/scripts/test-serving.sh +++ /dev/null @@ -1,14 +0,0 @@ -#!/usr/bin/env bash - -infra/scripts/download-maven-cache.sh \ - --archive-uri gs://feast-templocation-kf-feast/.m2.2019-10-24.tar \ - --output-dir /root/ - -mvn --batch-mode --also-make --projects serving test -TEST_EXIT_CODE=$? - -# Default artifact location setting in Prow jobs -LOGS_ARTIFACT_PATH=/logs/artifacts -cp -r serving/target/surefire-reports ${LOGS_ARTIFACT_PATH}/surefire-reports - -exit ${TEST_EXIT_CODE} diff --git a/infra/scripts/test_job.yaml b/infra/scripts/test_job.yaml deleted file mode 100644 index 4995b7d4..00000000 --- a/infra/scripts/test_job.yaml +++ /dev/null @@ -1,35 +0,0 @@ -apiVersion: batch/v1 -kind: Job -metadata: - name: test-runner - namespace: sparkop -spec: - backoffLimit: 1 - template: - spec: - containers: - - name: ubuntu - image: feast:local - command: ["bash", "-c", "./infra/scripts/test-end-to-end-local.sh"] - imagePullPolicy: Never - args: - - bash - stdin: true - stdinOnce: true - tty: true - env: - - name: AWS_ACCESS_KEY_ID - valueFrom: - secretKeyRef: - name: minio - key: accesskey - - name: AWS_SECRET_ACCESS_KEY - valueFrom: - secretKeyRef: - name: minio - key: secretkey - - name: AWS_DEFAULT_REGION - value: us-east-1 - - name: AWS_S3_SIGNATURE_VERSION - value: s3v4 - restartPolicy: Never diff --git a/tests/e2e/conftest.py b/tests/e2e/conftest.py index d63d311c..c04f82e7 100644 --- a/tests/e2e/conftest.py +++ b/tests/e2e/conftest.py @@ -1,5 +1,3 @@ -import os - import pytest @@ -9,28 +7,11 @@ def pytest_addoption(parser): parser.addoption("--job-service-url", action="store", default="localhost:6568") parser.addoption("--kafka-brokers", action="store", default="localhost:9092") - parser.addoption( - "--env", action="store", help="local|aws|gcloud|k8s", default="local" - ) - parser.addoption("--with-job-service", action="store_true") + parser.addoption("--env", action="store", help="k8s", default="k8s") parser.addoption("--staging-path", action="store") - parser.addoption("--dataproc-cluster-name", action="store") - parser.addoption("--dataproc-region", action="store") - parser.addoption("--emr-cluster-id", action="store") - parser.addoption("--emr-region", action="store") - parser.addoption("--dataproc-project", action="store") - parser.addoption("--dataproc-executor-instances", action="store", default="2") - parser.addoption("--dataproc-executor-cores", action="store", default="2") - parser.addoption("--dataproc-executor-memory", action="store", default="2g") parser.addoption("--k8s-namespace", action="store", default="sparkop-e2e") - parser.addoption("--ingestion-jar", action="store") - parser.addoption("--redis-url", action="store", default="localhost:6379") - parser.addoption("--redis-cluster", action="store_true") - parser.addoption("--feast-version", action="store") parser.addoption("--bq-project", action="store") parser.addoption("--feast-project", action="store", default="default") - parser.addoption("--statsd-url", action="store", default="localhost:8125") - parser.addoption("--prometheus-url", action="store", default="localhost:9102") parser.addoption("--enable-auth", action="store_true") parser.addoption( "--scheduled-streaming-job", @@ -47,40 +28,6 @@ def pytest_runtest_setup(item): pytest.skip(f"test requires env in {env_names}") -from .fixtures.base import project_root, project_version # noqa -from .fixtures.client import ( # noqa - feast_client, - feast_spark_client, - global_staging_path, - ingestion_job_jar, - local_staging_path, - tfrecord_feast_client, -) - -if not os.environ.get("DISABLE_SERVICE_FIXTURES"): - from .fixtures.services import ( # noqa - kafka_port, - kafka_server, - redis_server, - statsd_server, - zookeeper_server, - ) -else: - from .fixtures.external_services import ( # type: ignore # noqa - kafka_server, - redis_server, - statsd_server, - ) - -if not os.environ.get("DISABLE_FEAST_SERVICE_FIXTURES"): - from .fixtures.feast_services import * # type: ignore # noqa - from .fixtures.services import postgres_server # noqa -else: - from .fixtures.external_services import ( # type: ignore # noqa - feast_core, - feast_serving, - feast_jobservice, - enable_auth, - ) - +from .fixtures.client import * # noqa from .fixtures.data import * # noqa +from .fixtures.external_services import * # noqa diff --git a/tests/e2e/fixtures/base.py b/tests/e2e/fixtures/base.py deleted file mode 100644 index 2aac3728..00000000 --- a/tests/e2e/fixtures/base.py +++ /dev/null @@ -1,20 +0,0 @@ -import xml.etree.ElementTree as ET -from pathlib import Path - -import pytest - - -@pytest.fixture(scope="session") -def project_root(): - # This file is %root%/tests/e2e/fixtures/base.py - return Path(__file__).parent.parent.parent.parent - - -@pytest.fixture(scope="session") -def project_version(pytestconfig, project_root): - if pytestconfig.getoption("feast_version"): - return pytestconfig.getoption("feast_version") - - pom_xml = ET.parse(project_root / "pom.xml") - root = pom_xml.getroot() - return root.find(".properties/revision").text diff --git a/tests/e2e/fixtures/client.py b/tests/e2e/fixtures/client.py index 7ffa936f..b23acd22 100644 --- a/tests/e2e/fixtures/client.py +++ b/tests/e2e/fixtures/client.py @@ -1,109 +1,32 @@ import os -import tempfile import uuid -from typing import Optional, Tuple +from typing import Tuple import pytest -from pytest_redis.executor import RedisExecutor from feast import Client from feast_spark import Client as SparkClient -from tests.e2e.fixtures.statsd_stub import StatsDServer @pytest.fixture def feast_client( pytestconfig, - ingestion_job_jar, - redis_server: RedisExecutor, - statsd_server: StatsDServer, feast_core: Tuple[str, int], feast_serving: Tuple[str, int], local_staging_path, - feast_jobservice: Optional[Tuple[str, int]], + feast_jobservice: Tuple[str, int], enable_auth, ): - if feast_jobservice is None: - job_service_env = dict() - else: - job_service_env = dict( - job_service_url=f"{feast_jobservice[0]}:{feast_jobservice[1]}" - ) - - if pytestconfig.getoption("env") == "local": - import pyspark - - return Client( - core_url=f"{feast_core[0]}:{feast_core[1]}", - serving_url=f"{feast_serving[0]}:{feast_serving[1]}", - spark_launcher="standalone", - spark_standalone_master="local", - spark_home=os.getenv("SPARK_HOME") or os.path.dirname(pyspark.__file__), - spark_ingestion_jar=ingestion_job_jar, - redis_host=redis_server.host, - redis_port=redis_server.port, - spark_staging_location=os.path.join(local_staging_path, "spark"), - historical_feature_output_location=os.path.join( - local_staging_path, "historical_output" - ), - ingestion_drop_invalid_rows=True, - statsd_enabled=True, - statsd_host=statsd_server.host, - statsd_port=statsd_server.port, - **job_service_env, - ) - - elif pytestconfig.getoption("env") == "gcloud": - c = Client( - core_url=f"{feast_core[0]}:{feast_core[1]}", - serving_url=f"{feast_serving[0]}:{feast_serving[1]}", - spark_launcher="dataproc", - dataproc_cluster_name=pytestconfig.getoption("dataproc_cluster_name"), - dataproc_project=pytestconfig.getoption("dataproc_project"), - dataproc_region=pytestconfig.getoption("dataproc_region"), - spark_staging_location=os.path.join(local_staging_path, "dataproc"), - spark_ingestion_jar=ingestion_job_jar, - redis_host=pytestconfig.getoption("redis_url").split(":")[0], - redis_port=pytestconfig.getoption("redis_url").split(":")[1], - historical_feature_output_location=os.path.join( - local_staging_path, "historical_output" - ), - ingestion_drop_invalid_rows=True, - grpc_connection_timeout=30, - enable_auth=pytestconfig.getoption("enable_auth"), - **job_service_env, - ) - elif pytestconfig.getoption("env") == "aws": - c = Client( - core_url=f"{feast_core[0]}:{feast_core[1]}", - serving_url=f"{feast_serving[0]}:{feast_serving[1]}", - spark_launcher="emr", - emr_cluster_id=pytestconfig.getoption("emr_cluster_id"), - emr_region=pytestconfig.getoption("emr_region"), - spark_staging_location=os.path.join(local_staging_path, "emr"), - emr_log_location=os.path.join(local_staging_path, "emr_logs"), - spark_ingestion_jar=ingestion_job_jar, - redis_host=pytestconfig.getoption("redis_url").split(":")[0], - redis_port=pytestconfig.getoption("redis_url").split(":")[1], - historical_feature_output_location=os.path.join( - local_staging_path, "historical_output" - ), - ingestion_drop_invalid_rows=True, - enable_auth=pytestconfig.getoption("enable_auth"), - ) - elif pytestconfig.getoption("env") == "k8s": - c = Client( - core_url=f"{feast_core[0]}:{feast_core[1]}", - serving_url=f"{feast_serving[0]}:{feast_serving[1]}", - historical_feature_output_location=os.path.join( - local_staging_path, "historical_output" - ), - spark_staging_location=os.path.join(local_staging_path, "k8s"), - enable_auth=pytestconfig.getoption("enable_auth"), - **job_service_env, - ) - else: - raise KeyError(f"Unknown environment {pytestconfig.getoption('env')}") + c = Client( + core_url=f"{feast_core[0]}:{feast_core[1]}", + serving_url=f"{feast_serving[0]}:{feast_serving[1]}", + job_service_url=f"{feast_jobservice[0]}:{feast_jobservice[1]}", + historical_feature_output_location=os.path.join( + local_staging_path, "historical_output" + ), + spark_staging_location=os.path.join(local_staging_path, "k8s"), + enable_auth=pytestconfig.getoption("enable_auth"), + ) c.set_project(pytestconfig.getoption("feast_project")) return c @@ -114,91 +37,8 @@ def feast_spark_client(feast_client: Client) -> SparkClient: return SparkClient(feast_client) -@pytest.fixture -def tfrecord_feast_client( - pytestconfig, - feast_core: Tuple[str, int], - local_staging_path, - feast_jobservice: Optional[Tuple[str, int]], - enable_auth, -): - if feast_jobservice is None: - job_service_env = dict() - else: - job_service_env = dict( - job_service_url=f"{feast_jobservice[0]}:{feast_jobservice[1]}" - ) - - if pytestconfig.getoption("env") == "local": - import pyspark - - return Client( - core_url=f"{feast_core[0]}:{feast_core[1]}", - spark_launcher="standalone", - spark_standalone_master="local", - spark_home=os.getenv("SPARK_HOME") or os.path.dirname(pyspark.__file__), - spark_staging_location=os.path.join(local_staging_path, "spark"), - historical_feature_output_format="tfrecord", - historical_feature_output_location=os.path.join( - local_staging_path, "historical_output" - ), - **job_service_env, - ) - - elif pytestconfig.getoption("env") == "gcloud": - c = Client( - core_url=f"{feast_core[0]}:{feast_core[1]}", - spark_launcher="dataproc", - dataproc_cluster_name=pytestconfig.getoption("dataproc_cluster_name"), - dataproc_project=pytestconfig.getoption("dataproc_project"), - dataproc_region=pytestconfig.getoption("dataproc_region"), - spark_staging_location=os.path.join(local_staging_path, "dataproc"), - historical_feature_output_format="tfrecord", - historical_feature_output_location=os.path.join( - local_staging_path, "historical_output" - ), - ingestion_drop_invalid_rows=True, - **job_service_env, - ) - elif pytestconfig.getoption("env") == "aws": - return Client( - core_url=f"{feast_core[0]}:{feast_core[1]}", - spark_launcher="emr", - emr_cluster_id=pytestconfig.getoption("emr_cluster_id"), - emr_region=pytestconfig.getoption("emr_region"), - spark_staging_location=os.path.join(local_staging_path, "emr"), - emr_log_location=os.path.join(local_staging_path, "emr_logs"), - historical_feature_output_format="tfrecord", - historical_feature_output_location=os.path.join( - local_staging_path, "historical_output" - ), - ) - elif pytestconfig.getoption("env") == "k8s": - return Client( - core_url=f"{feast_core[0]}:{feast_core[1]}", - spark_launcher="k8s", - spark_staging_location=os.path.join(local_staging_path, "k8s"), - historical_feature_output_format="tfrecord", - historical_feature_output_location=os.path.join( - local_staging_path, "historical_output" - ), - **job_service_env, - ) - else: - raise KeyError(f"Unknown environment {pytestconfig.getoption('env')}") - - c.set_project(pytestconfig.getoption("feast_project")) - return c - - @pytest.fixture(scope="session") def global_staging_path(pytestconfig): - if pytestconfig.getoption("env") == "local" and not pytestconfig.getoption( - "staging_path", "" - ): - tmp_path = tempfile.mkdtemp() - return f"file://{tmp_path}" - staging_path = pytestconfig.getoption("staging_path") return os.path.join(staging_path, str(uuid.uuid4())) @@ -206,16 +46,3 @@ def global_staging_path(pytestconfig): @pytest.fixture(scope="function") def local_staging_path(global_staging_path): return os.path.join(global_staging_path, str(uuid.uuid4())) - - -@pytest.fixture(scope="session") -def ingestion_job_jar(pytestconfig, project_root, project_version): - default_path = ( - project_root - / "spark" - / "ingestion" - / "target" - / f"feast-ingestion-spark-{project_version}.jar" - ) - - return pytestconfig.getoption("ingestion_jar") or default_path diff --git a/tests/e2e/fixtures/external_services.py b/tests/e2e/fixtures/external_services.py index 4edb68ac..9224492d 100644 --- a/tests/e2e/fixtures/external_services.py +++ b/tests/e2e/fixtures/external_services.py @@ -1,25 +1,14 @@ import pytest -from pytest_redis.executor import NoopRedis - -from tests.e2e.fixtures.statsd_stub import PrometheusStatsDServer __all__ = ( "feast_core", "feast_serving", - "redis_server", "kafka_server", "enable_auth", "feast_jobservice", - "statsd_server", ) -@pytest.fixture(scope="session") -def redis_server(pytestconfig): - host, port = pytestconfig.getoption("redis_url").split(":") - return NoopRedis(host, port, None) - - @pytest.fixture(scope="session") def feast_core(pytestconfig): host, port = pytestconfig.getoption("core_url").split(":") @@ -47,12 +36,3 @@ def enable_auth(): def feast_jobservice(pytestconfig): host, port = pytestconfig.getoption("job_service_url").split(":") return host, port - - -@pytest.fixture(scope="session") -def statsd_server(pytestconfig): - host, port = pytestconfig.getoption("statsd_url").split(":") - prometheus_host, prometheus_port = pytestconfig.getoption("prometheus_url").split( - ":" - ) - return PrometheusStatsDServer(host, port, prometheus_host, prometheus_port) diff --git a/tests/e2e/fixtures/feast_services.py b/tests/e2e/fixtures/feast_services.py deleted file mode 100644 index b2038bf6..00000000 --- a/tests/e2e/fixtures/feast_services.py +++ /dev/null @@ -1,212 +0,0 @@ -import os -import shutil -import socket -import subprocess -import tempfile -import time -from typing import Any, Dict, Tuple - -import pyspark -import pytest -import yaml -from pytest_postgresql.executor import PostgreSQLExecutor -from pytest_redis.executor import RedisExecutor - -__all__ = ( - "feast_core", - "feast_serving", - "enable_auth", - "feast_jobservice", -) - - -def _start_jar(jar, options=None) -> subprocess.Popen: - if not os.path.isfile(jar): - raise ValueError(f"{jar} doesn't exist") - - cmd = [shutil.which("java"), "-jar", jar] - if options: - cmd.extend(options) - - return subprocess.Popen(cmd) # type: ignore - - -def _wait_port_open(port, max_wait=60): - print(f"Waiting for port {port}") - start = time.time() - - while True: - try: - socket.create_connection(("localhost", port), timeout=1) - except OSError: - if time.time() - start > max_wait: - raise - - time.sleep(1) - else: - return - - -@pytest.fixture( - scope="session", params=[False], -) -def enable_auth(request): - return request.param - - -@pytest.fixture(scope="session") -def feast_core( - java_project_root, project_version, enable_auth, postgres_server: PostgreSQLExecutor -): - jar = str( - java_project_root / "core" / "target" / f"feast-core-{project_version}-exec.jar" - ) - config = dict( - feast=dict( - security=dict( - enabled=enable_auth, - provider="jwt", - options=dict( - jwkEndpointURI="https://www.googleapis.com/oauth2/v3/certs" - ), - ) - ), - spring=dict( - datasource=dict( - url=f"jdbc:postgresql://{postgres_server.host}:{postgres_server.port}/postgres" - ) - ), - ) - - with tempfile.NamedTemporaryFile(suffix=".yaml", mode="w+") as config_file: - yaml.dump(config, config_file) - config_file.flush() - - process = _start_jar( - jar, - [ - f"--spring.config.location=classpath:/application.yml,file://{config_file.name}" - ], - ) - _wait_port_open(6565) - yield "localhost", 6565 - process.terminate() - - -@pytest.fixture(scope="session") -def feast_serving( - java_project_root, - project_version, - enable_auth, - redis_server: RedisExecutor, - feast_core, - pytestconfig, -): - _wait_port_open(6565) # in case core is restarting with new config - - jar = str( - java_project_root - / "serving" - / "target" - / f"feast-serving-{project_version}-exec.jar" - ) - if pytestconfig.getoption("redis_cluster"): - store: Dict[str, Any] = dict( - name="online", - type="REDIS_CLUSTER", - config=dict(connection_string=f"{redis_server.host}:{redis_server.port}"), - ) - else: - store = dict( - name="online", - type="REDIS", - config=dict(host=redis_server.host, port=redis_server.port), - ) - - config = dict( - feast=dict( - stores=[store], - coreAuthentication=dict(enabled=enable_auth, provider="google"), - security=dict(authentication=dict(enabled=enable_auth, provider="jwt")), - ) - ) - - with tempfile.NamedTemporaryFile(suffix=".yaml", mode="w+") as config_file: - yaml.dump(config, config_file) - config_file.flush() - - process = _start_jar( - jar, - [ - f"--spring.config.location=classpath:/application.yml,file://{config_file.name}" - ], - ) - _wait_port_open(6566) - yield "localhost", 6566 - process.terminate() - - -@pytest.fixture(scope="session") -def feast_jobservice( - pytestconfig, - ingestion_job_jar, - redis_server: RedisExecutor, - feast_core: Tuple[str, int], - feast_serving: Tuple[str, int], - global_staging_path, -): - if not pytestconfig.getoption("with_job_service"): - yield None - else: - env = os.environ.copy() - - if pytestconfig.getoption("env") == "local": - env["FEAST_CORE_URL"] = f"{feast_core[0]}:{feast_core[1]}" - env["FEAST_SERVING_URL"] = f"{feast_serving[0]}:{feast_serving[1]}" - env["FEAST_SPARK_LAUNCHER"] = "standalone" - env["FEAST_SPARK_STANDALONE_MASTER"] = "local" - env["FEAST_SPARK_HOME"] = os.getenv("SPARK_HOME") or os.path.dirname( - pyspark.__file__ - ) - env["FEAST_SPARK_INGESTION_JAR"] = ingestion_job_jar - env["FEAST_REDIS_HOST"] = redis_server.host - env["FEAST_REDIS_PORT"] = str(redis_server.port) - env["FEAST_SPARK_STAGING_LOCATION"] = os.path.join( - global_staging_path, "spark" - ) - env["FEAST_HISTORICAL_FEATURE_OUTPUT_LOCATION"] = os.path.join( - global_staging_path, "historical_output" - ) - - if pytestconfig.getoption("env") == "gcloud": - env["FEAST_CORE_URL"] = f"{feast_core[0]}:{feast_core[1]}" - env["FEAST_SERVING_URL"] = f"{feast_serving[0]}:{feast_serving[1]}" - env["FEAST_SPARK_LAUNCHER"] = "dataproc" - env["FEAST_DATAPROC_CLUSTER_NAME"] = pytestconfig.getoption( - "dataproc_cluster_name" - ) - env["FEAST_DATAPROC_PROJECT"] = pytestconfig.getoption("dataproc_project") - env["FEAST_DATAPROC_REGION"] = pytestconfig.getoption("dataproc_region") - env["FEAST_DATAPROC_EXECUTOR_INSTANCES"] = pytestconfig.getoption( - "dataproc_executor_instances" - ) - env["FEAST_DATAPROC_EXECUTOR_CORES"] = pytestconfig.getoption( - "dataproc_executor_cores" - ) - env["FEAST_DATAPROC_EXECUTOR_MEMORY"] = pytestconfig.getoption( - "dataproc_executor_memory" - ) - env["FEAST_SPARK_STAGING_LOCATION"] = os.path.join( - global_staging_path, "dataproc" - ) - env["FEAST_SPARK_INGESTION_JAR"] = ingestion_job_jar - env["FEAST_REDIS_HOST"] = pytestconfig.getoption("redis_url").split(":")[0] - env["FEAST_REDIS_PORT"] = pytestconfig.getoption("redis_url").split(":")[1] - env["FEAST_HISTORICAL_FEATURE_OUTPUT_LOCATION"] = os.path.join( - global_staging_path, "historical_output" - ) - - process = subprocess.Popen(["feast-spark", "server"], env=env) - _wait_port_open(6568) - yield "localhost", 6568 - process.terminate() diff --git a/tests/e2e/fixtures/services.py b/tests/e2e/fixtures/services.py deleted file mode 100644 index c74ee342..00000000 --- a/tests/e2e/fixtures/services.py +++ /dev/null @@ -1,74 +0,0 @@ -import os -import pathlib -import shutil - -import port_for -import pytest -import requests -from pytest_kafka import make_kafka_server, make_zookeeper_process -from pytest_postgresql import factories as pg_factories -from pytest_redis import factories as redis_factories - -__all__ = ( - "kafka_server", - "kafka_port", - "zookeeper_server", - "postgres_server", - "redis_server", - "statsd_server", -) - -from tests.e2e.fixtures.statsd_stub import StatsDStub - - -def download_kafka(version="2.12-2.6.0"): - temp_dir = pathlib.Path("/tmp") - local_path = temp_dir / f"kafka_{version}.tgz" - - if not os.path.isfile(local_path): - r = requests.get( - f"https://archive.apache.org/dist/kafka/2.6.0/kafka_{version}.tgz" - ) - - r.raise_for_status() - - with open(local_path, "wb") as f: - f.write(r.content) - - shutil.unpack_archive(str(local_path), str(temp_dir)) - return temp_dir / f"kafka_{version}" / "bin" - - -@pytest.fixture -def kafka_server(kafka_port): - _, port = kafka_port - return "localhost", port - - -@pytest.fixture -def statsd_server(): - port = port_for.select_random(None) - server = StatsDStub(port=port) - server.start() - yield server - server.stop() - - -postgres_server = pg_factories.postgresql_proc(password="password") -redis_server = redis_factories.redis_proc( - executable=shutil.which("redis-server"), timeout=3600 -) - -KAFKA_BIN = download_kafka() -zookeeper_server = make_zookeeper_process( - str(KAFKA_BIN / "zookeeper-server-start.sh"), - zk_config_template=""" -dataDir={zk_data_dir} -clientPort={zk_port} -maxClientCnxns=0 -admin.enableServer=false""", -) -kafka_port = make_kafka_server( - kafka_bin=str(KAFKA_BIN / "kafka-server-start.sh"), - zookeeper_fixture_name="zookeeper_server", -) diff --git a/tests/e2e/fixtures/statsd_stub.py b/tests/e2e/fixtures/statsd_stub.py deleted file mode 100644 index ff85cb6e..00000000 --- a/tests/e2e/fixtures/statsd_stub.py +++ /dev/null @@ -1,115 +0,0 @@ -import select -import socket -import threading -from collections import defaultdict, namedtuple -from typing import Dict - -import requests - -MetricLine = namedtuple("MetricLine", ["name", "value", "type"]) - - -class StatsDServer: - host: str - port: int - metrics: Dict[str, int] - - -class PrometheusStatsDServer(StatsDServer): - def __init__( - self, - statsd_host: str, - statsd_port: int, - prometheus_host: str, - prometheus_port: int, - ): - self.host = statsd_host - self.port = statsd_port - - self.prometheus_host = prometheus_host - self.prometheus_port = prometheus_port - - @property - def metrics(self): - """ Parse Prometheus response into metrics dict """ - - data = requests.get( - f"http://{self.prometheus_host}:{self.prometheus_port}/metrics" - ).content.decode() - lines = [line for line in data.split("\n") if not line.startswith("#")] - metrics = {} - for line in lines: - if not line: - continue - - name, value = line.split(" ") - - try: - value = int(value) # type: ignore - except ValueError: - value = float(value) # type: ignore - - if "{" in name and "}" in name: - base = name[: name.index("{")] - tags = name[name.index("{") + 1 : -1] - tags = [tag.split("=") for tag in tags.split(",")] - tags = [(key, val.replace('"', "")) for key, val in tags] - - name = base + "#" + ",".join(f"{k}:{v}" for k, v in sorted(tags)) - - metrics[name] = value - - return metrics - - -def parse_metric_line(line: str) -> MetricLine: - parts = line.split("|") - name, value = parts[0].split(":") - type_ = parts[1] - - try: - value = int(value) # type: ignore - except ValueError: - value = float(value) # type: ignore - - if len(parts) == 3 and parts[2].startswith("#"): - # Add tags to name - tags = sorted(parts[2][1:].split(",")) - name = name + "#" + ",".join(tags) - - return MetricLine(name, value, type_) - - -class StatsDStub(StatsDServer): - def __init__(self, port: int): - self.host = "localhost" - self.port = port - - self._stop_event = threading.Event() - self.metrics = defaultdict(lambda: 0) - - def serve(self): - sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - sock.bind((self.host, self.port)) - sock.setblocking(False) - - while True: - ready = select.select([sock], [], [], 1) - if ready[0]: - data = sock.recv(65535) - - lines = data.decode("utf-8").split("\n") - for line in lines: - print("Metric received:", line) - m = parse_metric_line(line) - self.metrics[m.name] += m.value - - if self._stop_event.wait(0.01): - break - - def start(self): - t = threading.Thread(target=self.serve) - t.start() - - def stop(self): - self._stop_event.set() diff --git a/tests/e2e/test_historical_features.py b/tests/e2e/test_historical_features.py index 3b6bb09b..9168e26f 100644 --- a/tests/e2e/test_historical_features.py +++ b/tests/e2e/test_historical_features.py @@ -11,7 +11,6 @@ from pyarrow import parquet from feast import Client, Entity, Feature, FeatureTable, ValueType -from feast.constants import ConfigOptions as opt from feast.data_source import BigQuerySource, FileSource from feast_spark import Client as SparkClient from feast_spark.pyspark.abc import SparkJobStatus @@ -87,17 +86,9 @@ def generate_data(): return transactions_df, customer_df -def _get_azure_creds(feast_client: Client): - return ( - feast_client._config.get(opt.AZURE_BLOB_ACCOUNT_NAME, None), - feast_client._config.get(opt.AZURE_BLOB_ACCOUNT_ACCESS_KEY, None), - ) - - def test_historical_features( feast_client: Client, feast_spark_client: SparkClient, - tfrecord_feast_client: Client, batch_source: Union[BigQuerySource, FileSource], ): customer_entity = Entity( @@ -134,12 +125,7 @@ def test_historical_features( output_dir = job.get_output_file_uri() - # will both be None if not using Azure blob storage - account_name, account_key = _get_azure_creds(feast_client) - - joined_df = read_parquet( - output_dir, azure_account_name=account_name, azure_account_key=account_key - ) + joined_df = read_parquet(output_dir) expected_joined_df = pd.DataFrame( { diff --git a/tests/e2e/test_job_scheduling.py b/tests/e2e/test_job_scheduling.py index 31c6e8e5..dc2deb1a 100644 --- a/tests/e2e/test_job_scheduling.py +++ b/tests/e2e/test_job_scheduling.py @@ -1,7 +1,6 @@ import hashlib import uuid -import pytest as pytest from kubernetes import client, config from feast import Client, Entity, Feature, FeatureTable, FileSource, ValueType @@ -9,7 +8,6 @@ from feast_spark import Client as SparkClient -@pytest.mark.env("k8s") def test_schedule_batch_ingestion_jobs( pytestconfig, feast_client: Client, feast_spark_client: SparkClient ): diff --git a/tests/e2e/test_online_features.py b/tests/e2e/test_online_features.py index 75e4301d..1903cb4d 100644 --- a/tests/e2e/test_online_features.py +++ b/tests/e2e/test_online_features.py @@ -2,13 +2,11 @@ import os import time import uuid -from datetime import datetime, timedelta +from datetime import timedelta from typing import Union import numpy as np import pandas as pd -import pytest -from google.cloud import bigquery from feast import ( BigQuerySource, @@ -23,7 +21,7 @@ from feast.data_format import AvroFormat, ParquetFormat from feast.wait import wait_retry_backoff from feast_spark import Client as SparkClient -from feast_spark.pyspark.abc import SparkJobStatus +from feast_spark.pyspark.abc import SparkJob, SparkJobStatus from tests.e2e.utils.kafka import check_consumer_exist, ingest_and_retrieve @@ -62,45 +60,6 @@ def test_offline_ingestion( ingest_and_verify(feast_client, feast_spark_client, feature_table, original) -@pytest.mark.env("gcloud") -def test_offline_ingestion_from_bq_view( - pytestconfig, bq_dataset, feast_client: Client, feast_spark_client: SparkClient -): - original = generate_data() - bq_project = pytestconfig.getoption("bq_project") - - bq_client = bigquery.Client(project=bq_project) - source_ref = bigquery.TableReference( - bigquery.DatasetReference(bq_project, bq_dataset), - f"ingestion_source_{datetime.now():%Y%m%d%H%M%s}", - ) - bq_client.load_table_from_dataframe(original, source_ref).result() - - view_ref = bigquery.TableReference( - bigquery.DatasetReference(bq_project, bq_dataset), - f"ingestion_view_{datetime.now():%Y%m%d%H%M%s}", - ) - view = bigquery.Table(view_ref) - view.view_query = f"select * from `{source_ref.project}.{source_ref.dataset_id}.{source_ref.table_id}`" - bq_client.create_table(view) - - entity = Entity(name="s2id", description="S2id", value_type=ValueType.INT64) - feature_table = FeatureTable( - name="bq_ingestion", - entities=["s2id"], - features=[Feature("unique_drivers", ValueType.INT64)], - batch_source=BigQuerySource( - event_timestamp_column="event_timestamp", - table_ref=f"{view_ref.project}:{view_ref.dataset_id}.{view_ref.table_id}", - ), - ) - - feast_client.apply(entity) - feast_client.apply(feature_table) - - ingest_and_verify(feast_client, feast_spark_client, feature_table, original) - - def test_streaming_ingestion( feast_client: Client, feast_spark_client: SparkClient, @@ -137,7 +96,7 @@ def test_streaming_ingestion( job = feast_spark_client.start_stream_to_online_ingestion(feature_table) assert job.get_feature_table() == feature_table.name wait_retry_backoff( - lambda: (None, job.get_status() == SparkJobStatus.IN_PROGRESS), 180 + lambda: (None, job.get_status() == SparkJobStatus.IN_PROGRESS), 300 ) else: job = None @@ -172,6 +131,12 @@ def test_streaming_ingestion( ) +def wait_for_job_to_complete(job: SparkJob): + wait_retry_backoff( + lambda: (None, job.get_status() == SparkJobStatus.COMPLETED), 180 + ) + + def ingest_and_verify( feast_client: Client, feast_spark_client: SparkClient, @@ -185,9 +150,7 @@ def ingest_and_verify( ) assert job.get_feature_table() == feature_table.name - wait_retry_backoff( - lambda: (None, job.get_status() == SparkJobStatus.COMPLETED), 180 - ) + wait_for_job_to_complete(job) features = feast_client.get_online_features( [f"{feature_table.name}:unique_drivers"], @@ -231,9 +194,7 @@ def test_list_jobs_long_table_name( data_sample.event_timestamp.max().to_pydatetime() + timedelta(seconds=1), ) - wait_retry_backoff( - lambda: (None, job.get_status() == SparkJobStatus.COMPLETED), 180 - ) + wait_for_job_to_complete(job) all_job_ids = [ job.get_id() for job in feast_spark_client.list_jobs( diff --git a/tests/e2e/test_register.py b/tests/e2e/test_register.py index 23a2adfc..e7b2ac31 100644 --- a/tests/e2e/test_register.py +++ b/tests/e2e/test_register.py @@ -1,22 +1,13 @@ -from datetime import datetime - -import numpy as np -import pandas as pd import pytest -import pytz -from google.api_core.exceptions import NotFound -from google.cloud import bigquery from google.protobuf.duration_pb2 import Duration -from pandas.testing import assert_frame_equal from feast.client import Client from feast.data_format import ParquetFormat, ProtoFormat -from feast.data_source import BigQuerySource, FileSource, KafkaSource +from feast.data_source import FileSource, KafkaSource from feast.entity import Entity from feast.feature import Feature from feast.feature_table import FeatureTable from feast.value_type import ValueType -from feast.wait import wait_retry_backoff @pytest.fixture @@ -78,19 +69,6 @@ def basic_featuretable(): ) -@pytest.fixture -def bq_dataframe(): - N_ROWS = 100 - time_offset = datetime.utcnow().replace(tzinfo=pytz.utc) - return pd.DataFrame( - { - "datetime": [time_offset] * N_ROWS, - "dev_feature_float": [np.float(row) for row in range(N_ROWS)], - "dev_feature_string": ["feat_" + str(row) for row in range(N_ROWS)], - } - ) - - @pytest.fixture def alltypes_entity(): return Entity( @@ -205,66 +183,3 @@ def test_get_list_alltypes( ft for ft in feast_client.list_feature_tables() if ft.name == "alltypes" ][0] assert actual_list_feature_table == alltypes_featuretable - - -@pytest.mark.env("gcloud") -def test_ingest_into_bq( - feast_client: Client, - customer_entity: Entity, - driver_entity: Entity, - bq_dataframe: pd.DataFrame, - bq_dataset: str, - pytestconfig, -): - bq_project = pytestconfig.getoption("bq_project") - bq_table_id = f"bq_staging_{datetime.now():%Y%m%d%H%M%s}" - ft = FeatureTable( - name="basic_featuretable", - entities=["driver_id", "customer_id"], - features=[ - Feature(name="dev_feature_float", dtype=ValueType.FLOAT), - Feature(name="dev_feature_string", dtype=ValueType.STRING), - ], - max_age=Duration(seconds=3600), - batch_source=BigQuerySource( - table_ref=f"{bq_project}:{bq_dataset}.{bq_table_id}", - event_timestamp_column="datetime", - created_timestamp_column="timestamp", - ), - ) - - # ApplyEntity - feast_client.apply(customer_entity) - feast_client.apply(driver_entity) - - # ApplyFeatureTable - feast_client.apply(ft) - feast_client.ingest(ft, bq_dataframe, timeout=120) - - bq_client = bigquery.Client(project=bq_project) - - # Poll BQ for table until the table has been created - def try_get_table(): - try: - table = bq_client.get_table( - bigquery.TableReference( - bigquery.DatasetReference(bq_project, bq_dataset), bq_table_id - ) - ) - except NotFound: - return None, False - else: - return table, True - - wait_retry_backoff( - retry_fn=try_get_table, - timeout_secs=30, - timeout_msg="Timed out trying to get bigquery table", - ) - - query_string = f"SELECT * FROM `{bq_project}.{bq_dataset}.{bq_table_id}`" - - job = bq_client.query(query_string) - query_df = job.to_dataframe() - - assert_frame_equal(query_df, bq_dataframe) diff --git a/tests/e2e/test_validation.py b/tests/e2e/test_validation.py index 55f69a72..f3c2b31f 100644 --- a/tests/e2e/test_validation.py +++ b/tests/e2e/test_validation.py @@ -10,7 +10,6 @@ from feast.wait import wait_retry_backoff from feast_spark import Client as SparkClient from feast_spark.contrib.validation.ge import apply_validation, create_validation_udf -from tests.e2e.fixtures.statsd_stub import StatsDServer from tests.e2e.utils.common import avro_schema, create_schema, start_job, stop_job from tests.e2e.utils.kafka import check_consumer_exist, ingest_and_retrieve @@ -103,11 +102,7 @@ def test_validation_with_ge( @pytest.mark.env("local") def test_validation_reports_metrics( - feast_client: Client, - feast_spark_client: SparkClient, - kafka_server, - statsd_server: StatsDServer, - pytestconfig, + feast_client: Client, feast_spark_client: SparkClient, kafka_server, pytestconfig, ): kafka_broker = f"{kafka_server[0]}:{kafka_server[1]}" topic_name = f"avro-{uuid.uuid4()}" @@ -138,14 +133,6 @@ def test_validation_reports_metrics( test_data = generate_test_data() ge_ds = PandasDataset(test_data) validation_result = ge_ds.validate(expectations, result_format="COMPLETE") - unexpected_counts = { - "expect_column_values_to_be_between_num_0_100": validation_result.results[ - 0 - ].result["unexpected_count"], - "expect_column_values_to_be_in_set_set": validation_result.results[1].result[ - "unexpected_count" - ], - } invalid_idx = list( { idx @@ -169,23 +156,3 @@ def test_validation_reports_metrics( ) finally: stop_job(job, feast_spark_client, feature_table) - - expected_metrics = [ - ( - f"feast_feature_validation_check_failed#check:{check_name}," - f"feature_table:{feature_table.name},project:{feast_client.project}", - value, - ) - for check_name, value in unexpected_counts.items() - ] - wait_retry_backoff( - lambda: ( - None, - all(statsd_server.metrics.get(m) == v for m, v in expected_metrics), - ), - timeout_secs=30, - timeout_msg="Expected metrics were not received: " - + str(expected_metrics) - + "\n" - "Actual received metrics" + str(statsd_server.metrics), - ) diff --git a/tests/requirements.txt b/tests/requirements.txt index 3f5bb649..bf9d9ad4 100644 --- a/tests/requirements.txt +++ b/tests/requirements.txt @@ -6,9 +6,6 @@ pytest-benchmark==3.2.2 pytest-mock==1.10.4 pytest-ordering==0.6.* pytest-xdist==2.1.0 -pytest-postgresql==2.5.1 -pytest-redis==2.0.0 -pytest-kafka==0.4.0 deepdiff==4.3.2 kafka-python==2.0.2 great-expectations==0.13.2