From 30952964e753468489d7fbf7def82fe7ac95a499 Mon Sep 17 00:00:00 2001 From: Srihari Date: Mon, 26 May 2025 10:37:41 +0530 Subject: [PATCH] test: Add Operator E2E tests for Feast Apply and Materialize functionality Signed-off-by: Srihari --- infra/feast-operator/test/e2e/e2e_test.go | 34 ++- .../previous-version/previous_version_test.go | 4 +- .../feast_integration_test_crs/feast.yaml | 55 ++++ .../feast_integration_test_crs/postgres.yaml | 55 ++++ .../feast_integration_test_crs/redis.yaml | 39 +++ ...v1alpha1_remote_registry_featurestore.yaml | 2 +- .../test/upgrade/upgrade_test.go | 4 +- infra/feast-operator/test/utils/test_util.go | 276 +++++++++++++++++- 8 files changed, 449 insertions(+), 20 deletions(-) create mode 100644 infra/feast-operator/test/testdata/feast_integration_test_crs/feast.yaml create mode 100644 infra/feast-operator/test/testdata/feast_integration_test_crs/postgres.yaml create mode 100644 infra/feast-operator/test/testdata/feast_integration_test_crs/redis.yaml diff --git a/infra/feast-operator/test/e2e/e2e_test.go b/infra/feast-operator/test/e2e/e2e_test.go index d1051900ae5..fb2ce69992a 100644 --- a/infra/feast-operator/test/e2e/e2e_test.go +++ b/infra/feast-operator/test/e2e/e2e_test.go @@ -17,11 +17,16 @@ limitations under the License. package e2e import ( + "fmt" + "os" + "github.com/feast-dev/feast/infra/feast-operator/test/utils" . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" ) var _ = Describe("controller", Ordered, func() { + _, isRunOnOpenShiftCI := os.LookupEnv("RUN_ON_OPENSHIFT_CI") featureStoreName := "simple-feast-setup" feastResourceName := utils.FeastPrefix + featureStoreName feastK8sResourceNames := []string{ @@ -29,26 +34,43 @@ var _ = Describe("controller", Ordered, func() { feastResourceName + "-offline", feastResourceName + "-ui", } + namespace := "test-ns-feast" + defaultFeatureStoreCRTest := "TesDefaultFeastCR" + remoteRegistryFeatureStoreCRTest := "TestRemoteRegistryFeastCR" + applyAndMaterializeTest := "TestApplyAndMaterializeFeastDefinitions" runTestDeploySimpleCRFunc := utils.GetTestDeploySimpleCRFunc("/test/e2e", "test/testdata/feast_integration_test_crs/v1alpha1_default_featurestore.yaml", - featureStoreName, feastResourceName, feastK8sResourceNames) + featureStoreName, feastResourceName, feastK8sResourceNames, namespace) runTestWithRemoteRegistryFunction := utils.GetTestWithRemoteRegistryFunc("/test/e2e", "test/testdata/feast_integration_test_crs/v1alpha1_default_featurestore.yaml", "test/testdata/feast_integration_test_crs/v1alpha1_remote_registry_featurestore.yaml", - featureStoreName, feastResourceName, feastK8sResourceNames) + featureStoreName, feastResourceName, feastK8sResourceNames, namespace) + + runTestApplyAndMaterializeFunc := utils.RunTestApplyAndMaterializeFunc("/test/e2e", namespace, "credit-scoring", utils.FeastPrefix+"credit-scoring") BeforeAll(func() { - utils.DeployOperatorFromCode("/test/e2e", false) + if !isRunOnOpenShiftCI { + utils.DeployOperatorFromCode("/test/e2e", false) + } + By(fmt.Sprintf("Creating test namespace: %s", namespace)) + err := utils.CreateNamespace(namespace, "/test/e2e") + Expect(err).ToNot(HaveOccurred(), fmt.Sprintf("failed to create namespace %s", namespace)) }) AfterAll(func() { - utils.DeleteOperatorDeployment("/test/e2e") + if !isRunOnOpenShiftCI { + utils.DeleteOperatorDeployment("/test/e2e") + } + By(fmt.Sprintf("Deleting test namespace: %s", namespace)) + err := utils.DeleteNamespace(namespace, "/test/e2e") + Expect(err).ToNot(HaveOccurred(), fmt.Sprintf("failed to delete namespace %s", namespace)) }) Context("Operator E2E Tests", func() { - It("Should be able to deploy and run a default feature store CR successfully", runTestDeploySimpleCRFunc) - It("Should be able to deploy and run a feature store with remote registry CR successfully", runTestWithRemoteRegistryFunction) + It("Should be able to deploy and run a "+defaultFeatureStoreCRTest+" successfully", runTestDeploySimpleCRFunc) + It("Should be able to deploy and run a "+remoteRegistryFeatureStoreCRTest+" successfully", runTestWithRemoteRegistryFunction) + It("Should be able to apply and run a "+applyAndMaterializeTest+" materialize feature store definitions using CronJob successfully", runTestApplyAndMaterializeFunc) }) }) diff --git a/infra/feast-operator/test/previous-version/previous_version_test.go b/infra/feast-operator/test/previous-version/previous_version_test.go index 9775d239bcc..9d0220674fb 100644 --- a/infra/feast-operator/test/previous-version/previous_version_test.go +++ b/infra/feast-operator/test/previous-version/previous_version_test.go @@ -38,9 +38,9 @@ var _ = Describe("previous version operator", Ordered, func() { } runTestDeploySimpleCRFunc := utils.GetTestDeploySimpleCRFunc("/test/upgrade", utils.GetSimplePreviousVerCR(), - utils.FeatureStoreName, utils.FeastResourceName, feastK8sResourceNames) + utils.FeatureStoreName, utils.FeastResourceName, feastK8sResourceNames, "default") runTestWithRemoteRegistryFunction := utils.GetTestWithRemoteRegistryFunc("/test/upgrade", utils.GetSimplePreviousVerCR(), - utils.GetRemoteRegistryPreviousVerCR(), utils.FeatureStoreName, utils.FeastResourceName, feastK8sResourceNames) + utils.GetRemoteRegistryPreviousVerCR(), utils.FeatureStoreName, utils.FeastResourceName, feastK8sResourceNames, "default") // Run Test on previous version operator It("Should be able to deploy and run a default feature store CR successfully", runTestDeploySimpleCRFunc) diff --git a/infra/feast-operator/test/testdata/feast_integration_test_crs/feast.yaml b/infra/feast-operator/test/testdata/feast_integration_test_crs/feast.yaml new file mode 100644 index 00000000000..8d311ab1de1 --- /dev/null +++ b/infra/feast-operator/test/testdata/feast_integration_test_crs/feast.yaml @@ -0,0 +1,55 @@ +apiVersion: v1 +kind: Secret +metadata: + name: feast-data-stores + namespace: test-ns-feast +stringData: + redis: | + connection_string: redis.test-ns-feast.svc.cluster.local:6379 + sql: | + path: postgresql+psycopg://${POSTGRES_USER}:${POSTGRES_PASSWORD}@postgres.test-ns-feast.svc.cluster.local:5432/${POSTGRES_DB} + cache_ttl_seconds: 60 + sqlalchemy_config_kwargs: + echo: false + pool_pre_ping: true +--- +apiVersion: feast.dev/v1alpha1 +kind: FeatureStore +metadata: + name: credit-scoring + namespace: test-ns-feast +spec: + feastProject: credit_scoring_local + feastProjectDir: + git: + url: https://github.com/feast-dev/feast-credit-score-local-tutorial + ref: 598a270 + services: + offlineStore: + persistence: + file: + type: duckdb + onlineStore: + persistence: + store: + type: redis + secretRef: + name: feast-data-stores + server: + envFrom: + - secretRef: + name: postgres-secret + env: + - name: MPLCONFIGDIR + value: /tmp + resources: + requests: + cpu: 150m + memory: 128Mi + registry: + local: + persistence: + store: + type: sql + secretRef: + name: feast-data-stores diff --git a/infra/feast-operator/test/testdata/feast_integration_test_crs/postgres.yaml b/infra/feast-operator/test/testdata/feast_integration_test_crs/postgres.yaml new file mode 100644 index 00000000000..f6799e1c700 --- /dev/null +++ b/infra/feast-operator/test/testdata/feast_integration_test_crs/postgres.yaml @@ -0,0 +1,55 @@ +apiVersion: v1 +kind: Secret +metadata: + name: postgres-secret + namespace: test-ns-feast +stringData: + POSTGRES_DB: feast + POSTGRES_USER: feast + POSTGRES_PASSWORD: feast +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: postgres + namespace: test-ns-feast +spec: + replicas: 1 + selector: + matchLabels: + app: postgres + template: + metadata: + labels: + app: postgres + spec: + containers: + - name: postgres + image: 'postgres:16-alpine' + ports: + - containerPort: 5432 + envFrom: + - secretRef: + name: postgres-secret + volumeMounts: + - mountPath: /var/lib/postgresql + name: postgresdata + volumes: + - name: postgresdata + emptyDir: {} +--- +apiVersion: v1 +kind: Service +metadata: + name: postgres + namespace: test-ns-feast + labels: + app: postgres +spec: + type: ClusterIP + ports: + - port: 5432 + targetPort: 5432 + protocol: TCP + selector: + app: postgres diff --git a/infra/feast-operator/test/testdata/feast_integration_test_crs/redis.yaml b/infra/feast-operator/test/testdata/feast_integration_test_crs/redis.yaml new file mode 100644 index 00000000000..57f2765e97e --- /dev/null +++ b/infra/feast-operator/test/testdata/feast_integration_test_crs/redis.yaml @@ -0,0 +1,39 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: redis + namespace: test-ns-feast +spec: + replicas: 1 + selector: + matchLabels: + app: redis + template: + metadata: + labels: + app: redis + spec: + containers: + - name: redis + image: 'quay.io/sclorg/redis-7-c9s' + ports: + - containerPort: 6379 + env: + - name: ALLOW_EMPTY_PASSWORD + value: "yes" +--- +apiVersion: v1 +kind: Service +metadata: + name: redis + namespace: test-ns-feast + labels: + app: redis +spec: + type: ClusterIP + ports: + - port: 6379 + targetPort: 6379 + protocol: TCP + selector: + app: redis diff --git a/infra/feast-operator/test/testdata/feast_integration_test_crs/v1alpha1_remote_registry_featurestore.yaml b/infra/feast-operator/test/testdata/feast_integration_test_crs/v1alpha1_remote_registry_featurestore.yaml index 9746e3819a5..e90d9142a5a 100644 --- a/infra/feast-operator/test/testdata/feast_integration_test_crs/v1alpha1_remote_registry_featurestore.yaml +++ b/infra/feast-operator/test/testdata/feast_integration_test_crs/v1alpha1_remote_registry_featurestore.yaml @@ -12,4 +12,4 @@ spec: remote: feastRef: name: simple-feast-setup - namespace: default \ No newline at end of file + namespace: test-ns-feast diff --git a/infra/feast-operator/test/upgrade/upgrade_test.go b/infra/feast-operator/test/upgrade/upgrade_test.go index 313fa41213c..3b7ba789767 100644 --- a/infra/feast-operator/test/upgrade/upgrade_test.go +++ b/infra/feast-operator/test/upgrade/upgrade_test.go @@ -33,9 +33,9 @@ var _ = Describe("operator upgrade", Ordered, func() { Context("Operator upgrade Tests", func() { runTestDeploySimpleCRFunc := utils.GetTestDeploySimpleCRFunc("/test/upgrade", utils.GetSimplePreviousVerCR(), - utils.FeatureStoreName, utils.FeastResourceName, []string{}) + utils.FeatureStoreName, utils.FeastResourceName, []string{}, "default") runTestWithRemoteRegistryFunction := utils.GetTestWithRemoteRegistryFunc("/test/upgrade", utils.GetSimplePreviousVerCR(), - utils.GetRemoteRegistryPreviousVerCR(), utils.FeatureStoreName, utils.FeastResourceName, []string{}) + utils.GetRemoteRegistryPreviousVerCR(), utils.FeatureStoreName, utils.FeastResourceName, []string{}, "default") // Run Test on current version operator with previous version CR It("Should be able to deploy and run a default feature store CR successfully", runTestDeploySimpleCRFunc) diff --git a/infra/feast-operator/test/utils/test_util.go b/infra/feast-operator/test/utils/test_util.go index b34c4272c46..41dcf624fae 100644 --- a/infra/feast-operator/test/utils/test_util.go +++ b/infra/feast-operator/test/utils/test_util.go @@ -7,6 +7,7 @@ import ( "fmt" "os" "os/exec" + "regexp" "strings" "time" @@ -285,10 +286,9 @@ func validateTheFeatureStoreCustomResource(namespace string, featureStoreName st } // GetTestDeploySimpleCRFunc - returns a simple CR deployment function -func GetTestDeploySimpleCRFunc(testDir string, crYaml string, featureStoreName string, feastResourceName string, feastK8sResourceNames []string) func() { +func GetTestDeploySimpleCRFunc(testDir string, crYaml string, featureStoreName string, feastResourceName string, feastK8sResourceNames []string, namespace string) func() { return func() { By("deploying the Simple Feast Custom Resource to Kubernetes") - namespace := "default" cmd := exec.Command("kubectl", "apply", "-f", crYaml, "-n", namespace) _, cmdOutputerr := Run(cmd, testDir) @@ -297,27 +297,31 @@ func GetTestDeploySimpleCRFunc(testDir string, crYaml string, featureStoreName s validateTheFeatureStoreCustomResource(namespace, featureStoreName, feastResourceName, feastK8sResourceNames, Timeout) By("deleting the feast deployment") - cmd = exec.Command("kubectl", "delete", "-f", crYaml) + cmd = exec.Command("kubectl", "delete", "-f", crYaml, "-n", namespace) _, cmdOutputerr = Run(cmd, testDir) ExpectWithOffset(1, cmdOutputerr).NotTo(HaveOccurred()) } } // GetTestWithRemoteRegistryFunc - returns a CR deployment with a remote registry function -func GetTestWithRemoteRegistryFunc(testDir string, crYaml string, remoteRegistryCRYaml string, featureStoreName string, feastResourceName string, feastK8sResourceNames []string) func() { +func GetTestWithRemoteRegistryFunc(testDir string, crYaml string, remoteRegistryCRYaml string, featureStoreName string, feastResourceName string, feastK8sResourceNames []string, namespace string) func() { return func() { By("deploying the Simple Feast Custom Resource to Kubernetes") - namespace := "default" cmd := exec.Command("kubectl", "apply", "-f", crYaml, "-n", namespace) _, cmdOutputErr := Run(cmd, testDir) ExpectWithOffset(1, cmdOutputErr).NotTo(HaveOccurred()) validateTheFeatureStoreCustomResource(namespace, featureStoreName, feastResourceName, feastK8sResourceNames, Timeout) - var remoteRegistryNs = "remote-registry" - By(fmt.Sprintf("Creating the remote registry namespace=%s", remoteRegistryNs)) - cmd = exec.Command("kubectl", "create", "ns", remoteRegistryNs) - _, _ = Run(cmd, testDir) + var remoteRegistryNs = "test-ns-remote-registry" + err := CreateNamespace(remoteRegistryNs, "/test/e2e") + Expect(err).ToNot(HaveOccurred(), fmt.Sprintf("failed to create namespace %s", remoteRegistryNs)) + + DeferCleanup(func() { + By(fmt.Sprintf("Deleting remote registry namespace: %s", remoteRegistryNs)) + err := DeleteNamespace(remoteRegistryNs, testDir) + Expect(err).ToNot(HaveOccurred(), fmt.Sprintf("failed to delete namespace %s", remoteRegistryNs)) + }) By("deploying the Simple Feast remote registry Custom Resource on Kubernetes") cmd = exec.Command("kubectl", "apply", "-f", remoteRegistryCRYaml, "-n", remoteRegistryNs) @@ -446,3 +450,257 @@ func GetSimplePreviousVerCR() string { func GetRemoteRegistryPreviousVerCR() string { return fmt.Sprintf("https://raw.githubusercontent.com/feast-dev/feast/refs/tags/v%s/infra/feast-operator/test/testdata/feast_integration_test_crs/v1alpha1_remote_registry_featurestore.yaml", feastversion.FeastVersion) } + +// CreateNamespace - create the namespace for tests +func CreateNamespace(namespace string, testDir string) error { + cmd := exec.Command("kubectl", "create", "ns", namespace) + output, err := Run(cmd, testDir) + if err != nil { + return fmt.Errorf("failed to create namespace %s: %v\nOutput: %s", namespace, err, output) + } + return nil +} + +// DeleteNamespace - Delete the namespace for tests +func DeleteNamespace(namespace string, testDir string) error { + cmd := exec.Command("kubectl", "delete", "ns", namespace, "--timeout=180s") + output, err := Run(cmd, testDir) + if err != nil { + return fmt.Errorf("failed to delete namespace %s: %v\nOutput: %s", namespace, err, output) + } + return nil +} + +// Test real-time credit scoring demo by applying feature store configs and verifying Feast definitions, materializing data, and executing a training and prediction jobs +func RunTestApplyAndMaterializeFunc(testDir string, namespace string, feastCRName string, feastDeploymentName string) func() { + return func() { + applyFeastInfraManifestsAndVerify(namespace, testDir) + applyFeastYamlAndVerify(namespace, testDir, feastDeploymentName, feastCRName) + VerifyApplyFeatureStoreDefinitions(namespace, feastCRName, feastDeploymentName) + VerifyFeastMethods(namespace, feastDeploymentName, testDir) + TrainAndTestModel(namespace, feastCRName, feastDeploymentName, testDir) + } +} + +// applies the manifests for Redis and Postgres and checks whether the deployments become available +func applyFeastInfraManifestsAndVerify(namespace string, testDir string) { + By("Applying postgres.yaml and redis.yaml manifests") + cmd := exec.Command("kubectl", "apply", "-n", namespace, "-f", "test/testdata/feast_integration_test_crs/postgres.yaml", "-f", "test/testdata/feast_integration_test_crs/redis.yaml") + _, cmdOutputerr := Run(cmd, testDir) + ExpectWithOffset(1, cmdOutputerr).NotTo(HaveOccurred()) + checkDeployment(namespace, "postgres") + checkDeployment(namespace, "redis") +} + +// validates the `feast apply` and `feast materialize-incremental commands were configured in the FeatureStore CR's CronJob config. +func VerifyApplyFeatureStoreDefinitions(namespace string, feastCRName string, feastDeploymentName string) { + By("Verify CronJob commands in FeatureStore CR") + cmd := exec.Command("kubectl", "get", "-n", namespace, "feast/"+feastCRName, "-o", "jsonpath={.status.applied.cronJob.containerConfigs.commands}") + output, err := cmd.CombinedOutput() + Expect(err).NotTo(HaveOccurred(), fmt.Sprintf("Failed to fetch CronJob commands:\n%s", output)) + commands := string(output) + fmt.Println("CronJob commands:", commands) + Expect(commands).To(ContainSubstring(`feast apply`)) + Expect(commands).To(ContainSubstring(`feast materialize-incremental $(date -u +'%Y-%m-%dT%H:%M:%S')`)) + + CreateAndVerifyJobFromCron(namespace, feastDeploymentName, "feast-test-apply", "", []string{ + "No project found in the repository", + "Applying changes for project credit_scoring_local", + "Deploying infrastructure for credit_history", + "Deploying infrastructure for zipcode_features", + "Materializing 2 feature views to", + "into the redis online store", + "credit_history from", + "zipcode_features from", + }) + +} + +// checks for the presence of expected entities, features, feature views, data sources, etc. +func VerifyFeastMethods(namespace string, feastDeploymentName string, testDir string) { + type feastCheck struct { + command []string + expected []string + logPrefix string + } + checks := []feastCheck{ + { + command: []string{"feast", "projects", "list"}, + expected: []string{"credit_scoring_local"}, + logPrefix: "Projects List", + }, + { + command: []string{"feast", "feature-views", "list"}, + expected: []string{"credit_history", "zipcode_features", "total_debt_calc"}, + logPrefix: "Feature Views List", + }, + { + command: []string{"feast", "entities", "list"}, + expected: []string{"zipcode", "dob_ssn"}, + logPrefix: "Entities List", + }, + { + command: []string{"feast", "data-sources", "list"}, + expected: []string{"Zipcode source", "Credit history", "application_data"}, + logPrefix: "Data Sources List", + }, + { + command: []string{"feast", "features", "list"}, + expected: []string{ + "credit_card_due", "mortgage_due", "student_loan_due", "vehicle_loan_due", + "hard_pulls", "missed_payments_2y", "missed_payments_1y", "missed_payments_6m", + "bankruptcies", "city", "state", "location_type", "tax_returns_filed", + "population", "total_wages", "total_debt_due", + }, + logPrefix: "Features List", + }, + } + + for _, check := range checks { + cmd := exec.Command("kubectl", "exec", "deploy/"+feastDeploymentName, "-n", namespace, "-c", "online", "--") + cmd.Args = append(cmd.Args, check.command...) + output, err := Run(cmd, testDir) + ExpectWithOffset(1, err).NotTo(HaveOccurred()) + + fmt.Printf("%s:\n%s\n", check.logPrefix, string(output)) + VerifyOutputContains(output, check.expected) + } +} + +// asserts that all expected substrings are present in the given output. +func VerifyOutputContains(output []byte, expectedSubstrings []string) { + outputStr := string(output) + for _, expected := range expectedSubstrings { + Expect(outputStr).To(ContainSubstring(expected), fmt.Sprintf("Expected output to contain: %s", expected)) + } +} + +// patches and validate the FeatureStore CR's CronJob to execute model training and prediction +func TrainAndTestModel(namespace string, feastCRName string, feastDeploymentName string, testDir string) { + By("Patching FeatureStore with train/test commands") + patch := `{ + "spec": { + "cronJob": { + "containerConfigs": { + "commands": [ + "pip install -r ../requirements.txt", + "cd ../ && python run.py" + ] + } + } + } + }` + cmd := exec.Command("kubectl", "patch", "feast/"+feastCRName, "-n", namespace, "--type=merge", "--patch", patch) + _, cmdOutputErr := Run(cmd, testDir) + ExpectWithOffset(1, cmdOutputErr).NotTo(HaveOccurred()) + fmt.Println("Patched FeatureStore with train/test commands") + + By("Validating patch was applied correctly") + cmd = exec.Command("kubectl", "get", "feast/"+feastCRName, "-n", namespace, "-o", "jsonpath={.status.applied.cronJob.containerConfigs.commands}") + output, err := Run(cmd, testDir) + ExpectWithOffset(1, err).NotTo(HaveOccurred()) + outputStr := string(output) + Expect(outputStr).To(ContainSubstring("pip install -r ../requirements.txt")) + Expect(outputStr).To(ContainSubstring("python run.py")) + fmt.Print("FeatureStore patched correctly with commands", outputStr) + + By("Creating Job from CronJob") + CreateAndVerifyJobFromCron(namespace, feastDeploymentName, "feast-test-job", testDir, []string{"Loan rejected!"}) +} + +// Create a Job and verifies its logs contain expected substrings +func CreateAndVerifyJobFromCron(namespace, cronName, jobName, testDir string, expectedLogSubstrings []string) { + By(fmt.Sprintf("Creating Job %s from CronJob %s", jobName, cronName)) + cmd := exec.Command("kubectl", "create", "job", "--from=cronjob/"+cronName, jobName, "-n", namespace) + _, err := Run(cmd, testDir) + ExpectWithOffset(1, err).NotTo(HaveOccurred()) + + By("Waiting for Job completion") + cmd = exec.Command("kubectl", "wait", "--for=condition=complete", "--timeout=3m", "job/"+jobName, "-n", namespace) + _, err = Run(cmd, testDir) + ExpectWithOffset(1, err).NotTo(HaveOccurred()) + + By("Checking logs of completed job") + cmd = exec.Command("kubectl", "logs", "job/"+jobName, "-n", namespace, "--all-containers=true") + output, err := Run(cmd, testDir) + ExpectWithOffset(1, err).NotTo(HaveOccurred()) + + outputStr := string(output) + ansi := regexp.MustCompile(`\x1b\[[0-9;]*m`) + outputStr = ansi.ReplaceAllString(outputStr, "") + for _, expected := range expectedLogSubstrings { + Expect(outputStr).To(ContainSubstring(expected)) + } +} + +// verifies the specified deployment exists and is in the "Available" state. +func checkDeployment(namespace, name string) { + By(fmt.Sprintf("Waiting for %s deployment to become available", name)) + err := CheckIfDeploymentExistsAndAvailable(namespace, name, 2*Timeout) + Expect(err).ToNot(HaveOccurred(), fmt.Sprintf( + "Deployment %s is not available but expected to be.\nError: %v", name, err, + )) + fmt.Printf("Deployment %s is available\n", name) +} + +// validate that the status of the FeatureStore CR is "Ready". +func validateFeatureStoreCRStatus(namespace, crName string) { + cmd := exec.Command("kubectl", "get", "feast", crName, "-n", namespace, "-o", "jsonpath={.status.phase}") + output, err := cmd.Output() + Expect(err).ToNot(HaveOccurred(), "failed to get Feature Store CR status") + Expect(string(output)).To(Equal("Ready")) + fmt.Printf("Feature Store CR is in %s state\n", output) +} + +// validate the feature store yaml +func validateFeatureStoreYaml(namespace, deployment string) { + cmd := exec.Command("kubectl", "exec", "deploy/"+deployment, "-n", namespace, "-c", "online", "--", "cat", "feature_store.yaml") + output, err := cmd.CombinedOutput() + Expect(err).NotTo(HaveOccurred(), "Failed to read feature_store.yaml") + + content := string(output) + Expect(content).To(ContainSubstring("offline_store:\n type: duckdb")) + Expect(content).To(ContainSubstring("online_store:\n type: redis")) + Expect(content).To(ContainSubstring("registry_type: sql")) +} + +// apply and verifies the Feast deployment becomes available, the CR status is "Ready +func applyFeastYamlAndVerify(namespace string, testDir string, feastDeploymentName string, feastCRName string) { + By("Applying Feast yaml for secrets and Feature store CR") + cmd := exec.Command("kubectl", "apply", "-n", namespace, + "-f", "test/testdata/feast_integration_test_crs/feast.yaml") + _, err := Run(cmd, testDir) + ExpectWithOffset(1, err).NotTo(HaveOccurred()) + checkDeployment(namespace, feastDeploymentName) + + By("Verify Feature Store CR is in Ready state") + validateFeatureStoreCRStatus(namespace, feastCRName) + + By("Verifying that the Postgres DB contains the expected Feast tables") + cmd = exec.Command("kubectl", "exec", "deploy/postgres", "-n", namespace, "--", "psql", "-h", "localhost", "-U", "feast", "feast", "-c", `\dt`) + output, err := cmd.CombinedOutput() + Expect(err).NotTo(HaveOccurred(), fmt.Sprintf("Failed to get tables from Postgres. Output:\n%s", output)) + outputStr := string(output) + fmt.Println("Postgres Tables:\n", outputStr) + // List of expected tables + expectedTables := []string{ + "data_sources", "entities", "feast_metadata", "feature_services", "feature_views", + "managed_infra", "on_demand_feature_views", "permissions", "projects", + "saved_datasets", "stream_feature_views", "validation_references", + } + for _, table := range expectedTables { + Expect(outputStr).To(ContainSubstring(table), fmt.Sprintf("Expected table %q not found in output:\n%s", table, outputStr)) + } + + By("Verifying that the Feast repo was successfully cloned by the init container") + cmd = exec.Command("kubectl", "logs", "-f", "-n", namespace, "deploy/"+feastDeploymentName, "-c", "feast-init") + output, err = cmd.CombinedOutput() + Expect(err).NotTo(HaveOccurred(), fmt.Sprintf("Failed to get logs from init container. Output:\n%s", output)) + outputStr = string(output) + fmt.Println("Init Container Logs:\n", outputStr) + // Assert that the logs contain success indicators + Expect(outputStr).To(ContainSubstring("Feast repo creation complete"), "Expected Feast repo creation message not found") + + By("Verifying client feature_store.yaml for expected store types") + validateFeatureStoreYaml(namespace, feastDeploymentName) +}