diff --git a/.github/workflows/dbt-integration-tests.yml b/.github/workflows/dbt-integration-tests.yml index 3c7c4b6824b..dd54ab36665 100644 --- a/.github/workflows/dbt-integration-tests.yml +++ b/.github/workflows/dbt-integration-tests.yml @@ -13,6 +13,10 @@ on: - 'sdk/python/tests/unit/dbt/**' - '.github/workflows/dbt-integration-tests.yml' +concurrency: + group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }} + cancel-in-progress: true + jobs: dbt-integration-test: if: diff --git a/.github/workflows/lint_pr.yml b/.github/workflows/lint_pr.yml index ec77fb038de..8cbfc78e21c 100644 --- a/.github/workflows/lint_pr.yml +++ b/.github/workflows/lint_pr.yml @@ -7,6 +7,10 @@ on: - edited - synchronize +concurrency: + group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }} + cancel-in-progress: true + jobs: validate-title: if: diff --git a/.github/workflows/linter.yml b/.github/workflows/linter.yml index d81e3b483ab..19e13d5f8e9 100644 --- a/.github/workflows/linter.yml +++ b/.github/workflows/linter.yml @@ -2,6 +2,10 @@ name: linter on: [push, pull_request] +concurrency: + group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }} + cancel-in-progress: true + jobs: lint-python: runs-on: [ubuntu-latest] diff --git a/.github/workflows/operator-e2e-integration-tests.yml b/.github/workflows/operator-e2e-integration-tests.yml index 41954daff68..bdaa3240702 100644 --- a/.github/workflows/operator-e2e-integration-tests.yml +++ b/.github/workflows/operator-e2e-integration-tests.yml @@ -13,6 +13,10 @@ on: paths: - 'infra/**' +concurrency: + group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }} + cancel-in-progress: true + jobs: operator-e2e-tests: timeout-minutes: 40 diff --git a/.github/workflows/operator_pr.yml b/.github/workflows/operator_pr.yml index aefdcbdcbb4..8f08c91b890 100644 --- a/.github/workflows/operator_pr.yml +++ b/.github/workflows/operator_pr.yml @@ -1,6 +1,11 @@ name: operator-pr on: [pull_request] + +concurrency: + group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }} + cancel-in-progress: true + jobs: operator-test: runs-on: ubuntu-latest diff --git a/.github/workflows/pr_duckdb_integration_tests.yml b/.github/workflows/pr_duckdb_integration_tests.yml index fa1dc44e860..d099d7fa582 100644 --- a/.github/workflows/pr_duckdb_integration_tests.yml +++ b/.github/workflows/pr_duckdb_integration_tests.yml @@ -7,6 +7,10 @@ on: - synchronize - labeled +concurrency: + group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }} + cancel-in-progress: true + jobs: integration-test-duckdb-offline: if: diff --git a/.github/workflows/pr_integration_tests.yml b/.github/workflows/pr_integration_tests.yml index b8fe5747d66..d65e9807df1 100644 --- a/.github/workflows/pr_integration_tests.yml +++ b/.github/workflows/pr_integration_tests.yml @@ -7,10 +7,10 @@ on: - synchronize - labeled -# concurrency is currently broken, see details https://github.com/actions/runner/issues/1532 -#concurrency: -# group: pr-integration-tests-${{ github.event.pull_request.number }} -# cancel-in-progress: true +concurrency: + group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }} + cancel-in-progress: true + permissions: actions: write pull-requests: read diff --git a/.github/workflows/pr_local_integration_tests.yml b/.github/workflows/pr_local_integration_tests.yml index 143cfe40973..b607c2c6b1e 100644 --- a/.github/workflows/pr_local_integration_tests.yml +++ b/.github/workflows/pr_local_integration_tests.yml @@ -8,6 +8,10 @@ on: - synchronize - labeled +concurrency: + group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }} + cancel-in-progress: true + jobs: integration-test-python-local: if: diff --git a/.github/workflows/pr_ray_integration_tests.yml b/.github/workflows/pr_ray_integration_tests.yml index 61e279bb838..4d54c8e34ed 100644 --- a/.github/workflows/pr_ray_integration_tests.yml +++ b/.github/workflows/pr_ray_integration_tests.yml @@ -7,6 +7,10 @@ on: - synchronize - labeled +concurrency: + group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }} + cancel-in-progress: true + jobs: integration-test-ray: if: diff --git a/.github/workflows/pr_registration_integration_tests.yml b/.github/workflows/pr_registration_integration_tests.yml index 9ee5d15a6a4..4085a320057 100644 --- a/.github/workflows/pr_registration_integration_tests.yml +++ b/.github/workflows/pr_registration_integration_tests.yml @@ -7,6 +7,10 @@ on: - synchronize - labeled +concurrency: + group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }} + cancel-in-progress: true + permissions: actions: write pull-requests: read @@ -39,6 +43,16 @@ jobs: (github.event.action != 'labeled' && (contains(github.event.pull_request.labels.*.name, 'ok-to-test') || contains(github.event.pull_request.labels.*.name, 'approved') || contains(github.event.pull_request.labels.*.name, 'lgtm')))) && github.repository == 'feast-dev/feast' runs-on: ubuntu-latest + services: + redis: + image: redis + ports: + - 6379:6379 + options: >- + --health-cmd "redis-cli ping" + --health-interval 10s + --health-timeout 5s + --health-retries 5 steps: - uses: actions/checkout@v4 with: diff --git a/.github/workflows/pr_remote_rbac_integration_tests.yml b/.github/workflows/pr_remote_rbac_integration_tests.yml index 56c59a544c9..91cf0f4c564 100644 --- a/.github/workflows/pr_remote_rbac_integration_tests.yml +++ b/.github/workflows/pr_remote_rbac_integration_tests.yml @@ -8,6 +8,10 @@ on: - synchronize - labeled +concurrency: + group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }} + cancel-in-progress: true + jobs: remote-rbac-integration-tests-python: if: diff --git a/.github/workflows/registry-rest-api-tests.yml b/.github/workflows/registry-rest-api-tests.yml index 229b6f5302c..bd50f613d0a 100644 --- a/.github/workflows/registry-rest-api-tests.yml +++ b/.github/workflows/registry-rest-api-tests.yml @@ -11,6 +11,10 @@ on: - synchronize - labeled +concurrency: + group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }} + cancel-in-progress: true + jobs: registry-rest-api-tests: timeout-minutes: 30 diff --git a/.github/workflows/security.yml b/.github/workflows/security.yml index 75b1ea1686c..c12998fce57 100644 --- a/.github/workflows/security.yml +++ b/.github/workflows/security.yml @@ -8,6 +8,10 @@ on: schedule: - cron: "0 6 * * 1" +concurrency: + group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }} + cancel-in-progress: true + jobs: codeql: name: CodeQL Analysis diff --git a/.github/workflows/smoke_tests.yml b/.github/workflows/smoke_tests.yml index 5f60dda4202..b183f6f47e9 100644 --- a/.github/workflows/smoke_tests.yml +++ b/.github/workflows/smoke_tests.yml @@ -3,6 +3,10 @@ name: smoke-tests on: pull_request: +concurrency: + group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }} + cancel-in-progress: true + jobs: smoke-test-python: runs-on: ${{ matrix.os }} diff --git a/.github/workflows/unit_tests.yml b/.github/workflows/unit_tests.yml index 318346bb3ed..1311cd12635 100644 --- a/.github/workflows/unit_tests.yml +++ b/.github/workflows/unit_tests.yml @@ -6,6 +6,10 @@ on: branches: - master +concurrency: + group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }} + cancel-in-progress: true + jobs: unit-test-python: runs-on: ${{ matrix.os }} diff --git a/Makefile b/Makefile index 93b4c027608..d0dcf797aee 100644 --- a/Makefile +++ b/Makefile @@ -184,7 +184,7 @@ test-python-smoke: ## Quick smoke test for development test-python-integration: ## Run Python integration tests (CI) uv run python -m pytest --tb=short -v -n 8 --integration --color=yes --durations=10 --timeout=1200 --timeout_method=thread --dist loadgroup \ -k "(not snowflake or not test_historical_features_main)" \ - -m "not rbac_remote_integration_test" \ + -m "not rbac_remote_integration_test and not ray_offline_stores_only" \ --ignore=sdk/python/tests/integration/registration \ --ignore=sdk/python/tests/integration/compute_engines/ray_compute \ --log-cli-level=INFO -s \ diff --git a/pixi.lock b/pixi.lock index 39d8edefcbf..fd0dc9d5636 100644 --- a/pixi.lock +++ b/pixi.lock @@ -1153,6 +1153,7 @@ environments: - pypi: https://files.pythonhosted.org/packages/b2/23/55d40e1bf54c141f541ab31b4b4b0f58610440c8837b1406f3467c2b4853/grpcio_testing-1.62.3-py3-none-any.whl - pypi: https://files.pythonhosted.org/packages/da/73/4ad5b1f6a2e21cf1e85afdaad2b7b1a933985e2f5d679147a1953aaa192c/gunicorn-25.1.0-py3-none-any.whl - pypi: https://files.pythonhosted.org/packages/04/4b/29cac41a4d98d144bf5f6d33995617b185d14b22401f75ca86f384e87ff1/h11-0.16.0-py3-none-any.whl + - pypi: https://files.pythonhosted.org/packages/42/6e/8adaefff7e3e216b0f7bd6cafce6d5d06798f31c3e2852dc3db6a7d758c9/hiredis-2.4.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl - pypi: https://files.pythonhosted.org/packages/7e/f5/f66802a942d491edb555dd61e3a9961140fd64c90bce1eafd741609d334d/httpcore-1.0.9-py3-none-any.whl - pypi: https://files.pythonhosted.org/packages/f5/71/b0a9193641d9e2471ac541d3b1b869538a5fb6419d52fd2669fa9c79e4b8/httptools-0.7.1-cp310-cp310-manylinux1_x86_64.manylinux_2_28_x86_64.manylinux_2_5_x86_64.whl - pypi: https://files.pythonhosted.org/packages/2a/39/e50c7c3a983047577ee07d2a9e53faf5a69493943ec3f6a384bdc792deb2/httpx-0.28.1-py3-none-any.whl @@ -1220,6 +1221,7 @@ environments: - pypi: https://files.pythonhosted.org/packages/2e/2e/dfbd2c9b3edf6a5a8cd9e66090221046839b488ea27824970426bf06b242/python_keycloak-4.2.2-py3-none-any.whl - pypi: https://files.pythonhosted.org/packages/81/c4/34e93fe5f5429d7570ec1fa436f1986fb1f00c3e0f43a589fe2bbcd22c3f/pytz-2025.2-py2.py3-none-any.whl - pypi: https://files.pythonhosted.org/packages/7a/1e/7acc4f0e74c4b3d9531e24739e0ab832a5edf40e64fbae1a9c01941cabd7/pyyaml-6.0.3-cp310-cp310-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl + - pypi: https://files.pythonhosted.org/packages/20/2e/409703d645363352a20c944f5d119bdae3eb3034051a53724a7c5fee12b8/redis-4.6.0-py3-none-any.whl - pypi: https://files.pythonhosted.org/packages/2c/58/ca301544e1fa93ed4f80d724bf5b194f6e4b945841c5bfd555878eea9fcb/referencing-0.37.0-py3-none-any.whl - pypi: https://files.pythonhosted.org/packages/1e/db/4254e3eabe8020b458f1a747140d32277ec7a271daf1d235b70dc0b4e6e3/requests-2.32.5-py3-none-any.whl - pypi: https://files.pythonhosted.org/packages/3b/5d/63d4ae3b9daea098d5d6f5da83984853c1bbacd5dc826764b249fe119d24/requests_oauthlib-2.0.0-py2.py3-none-any.whl @@ -1326,6 +1328,7 @@ environments: - pypi: https://files.pythonhosted.org/packages/b2/23/55d40e1bf54c141f541ab31b4b4b0f58610440c8837b1406f3467c2b4853/grpcio_testing-1.62.3-py3-none-any.whl - pypi: https://files.pythonhosted.org/packages/da/73/4ad5b1f6a2e21cf1e85afdaad2b7b1a933985e2f5d679147a1953aaa192c/gunicorn-25.1.0-py3-none-any.whl - pypi: https://files.pythonhosted.org/packages/04/4b/29cac41a4d98d144bf5f6d33995617b185d14b22401f75ca86f384e87ff1/h11-0.16.0-py3-none-any.whl + - pypi: https://files.pythonhosted.org/packages/d8/70/3f39ebfb3824578c34400df3b037b268abb5af0abaa789b430ffd17dd74e/hiredis-2.4.0-cp310-cp310-macosx_10_15_x86_64.whl - pypi: https://files.pythonhosted.org/packages/7e/f5/f66802a942d491edb555dd61e3a9961140fd64c90bce1eafd741609d334d/httpcore-1.0.9-py3-none-any.whl - pypi: https://files.pythonhosted.org/packages/c7/e5/c07e0bcf4ec8db8164e9f6738c048b2e66aabf30e7506f440c4cc6953f60/httptools-0.7.1-cp310-cp310-macosx_10_9_universal2.whl - pypi: https://files.pythonhosted.org/packages/2a/39/e50c7c3a983047577ee07d2a9e53faf5a69493943ec3f6a384bdc792deb2/httpx-0.28.1-py3-none-any.whl @@ -1393,6 +1396,7 @@ environments: - pypi: https://files.pythonhosted.org/packages/2e/2e/dfbd2c9b3edf6a5a8cd9e66090221046839b488ea27824970426bf06b242/python_keycloak-4.2.2-py3-none-any.whl - pypi: https://files.pythonhosted.org/packages/81/c4/34e93fe5f5429d7570ec1fa436f1986fb1f00c3e0f43a589fe2bbcd22c3f/pytz-2025.2-py2.py3-none-any.whl - pypi: https://files.pythonhosted.org/packages/f4/a0/39350dd17dd6d6c6507025c0e53aef67a9293a6d37d3511f23ea510d5800/pyyaml-6.0.3-cp310-cp310-macosx_10_13_x86_64.whl + - pypi: https://files.pythonhosted.org/packages/20/2e/409703d645363352a20c944f5d119bdae3eb3034051a53724a7c5fee12b8/redis-4.6.0-py3-none-any.whl - pypi: https://files.pythonhosted.org/packages/2c/58/ca301544e1fa93ed4f80d724bf5b194f6e4b945841c5bfd555878eea9fcb/referencing-0.37.0-py3-none-any.whl - pypi: https://files.pythonhosted.org/packages/1e/db/4254e3eabe8020b458f1a747140d32277ec7a271daf1d235b70dc0b4e6e3/requests-2.32.5-py3-none-any.whl - pypi: https://files.pythonhosted.org/packages/3b/5d/63d4ae3b9daea098d5d6f5da83984853c1bbacd5dc826764b249fe119d24/requests_oauthlib-2.0.0-py2.py3-none-any.whl @@ -1499,6 +1503,7 @@ environments: - pypi: https://files.pythonhosted.org/packages/b2/23/55d40e1bf54c141f541ab31b4b4b0f58610440c8837b1406f3467c2b4853/grpcio_testing-1.62.3-py3-none-any.whl - pypi: https://files.pythonhosted.org/packages/da/73/4ad5b1f6a2e21cf1e85afdaad2b7b1a933985e2f5d679147a1953aaa192c/gunicorn-25.1.0-py3-none-any.whl - pypi: https://files.pythonhosted.org/packages/04/4b/29cac41a4d98d144bf5f6d33995617b185d14b22401f75ca86f384e87ff1/h11-0.16.0-py3-none-any.whl + - pypi: https://files.pythonhosted.org/packages/ed/b7/26a56a3b991abe7fcf7bcfa8e0a08de3c3766c6caecb1ba46239342792ff/hiredis-2.4.0-cp310-cp310-macosx_11_0_arm64.whl - pypi: https://files.pythonhosted.org/packages/7e/f5/f66802a942d491edb555dd61e3a9961140fd64c90bce1eafd741609d334d/httpcore-1.0.9-py3-none-any.whl - pypi: https://files.pythonhosted.org/packages/7e/4f/35e3a63f863a659f92ffd92bef131f3e81cf849af26e6435b49bd9f6f751/httptools-0.7.1-cp310-cp310-macosx_11_0_arm64.whl - pypi: https://files.pythonhosted.org/packages/2a/39/e50c7c3a983047577ee07d2a9e53faf5a69493943ec3f6a384bdc792deb2/httpx-0.28.1-py3-none-any.whl @@ -1566,6 +1571,7 @@ environments: - pypi: https://files.pythonhosted.org/packages/2e/2e/dfbd2c9b3edf6a5a8cd9e66090221046839b488ea27824970426bf06b242/python_keycloak-4.2.2-py3-none-any.whl - pypi: https://files.pythonhosted.org/packages/81/c4/34e93fe5f5429d7570ec1fa436f1986fb1f00c3e0f43a589fe2bbcd22c3f/pytz-2025.2-py2.py3-none-any.whl - pypi: https://files.pythonhosted.org/packages/05/14/52d505b5c59ce73244f59c7a50ecf47093ce4765f116cdb98286a71eeca2/pyyaml-6.0.3-cp310-cp310-macosx_11_0_arm64.whl + - pypi: https://files.pythonhosted.org/packages/20/2e/409703d645363352a20c944f5d119bdae3eb3034051a53724a7c5fee12b8/redis-4.6.0-py3-none-any.whl - pypi: https://files.pythonhosted.org/packages/2c/58/ca301544e1fa93ed4f80d724bf5b194f6e4b945841c5bfd555878eea9fcb/referencing-0.37.0-py3-none-any.whl - pypi: https://files.pythonhosted.org/packages/1e/db/4254e3eabe8020b458f1a747140d32277ec7a271daf1d235b70dc0b4e6e3/requests-2.32.5-py3-none-any.whl - pypi: https://files.pythonhosted.org/packages/3b/5d/63d4ae3b9daea098d5d6f5da83984853c1bbacd5dc826764b249fe119d24/requests_oauthlib-2.0.0-py2.py3-none-any.whl @@ -2270,8 +2276,8 @@ packages: requires_python: '>=3.10' - pypi: ./ name: feast - version: 0.60.1.dev25+gedb2b0c39.d20260303 - sha256: 4cfc3caaa0403077a23deddd0daebd941e446c8876c7bd2850cfdb9ee78e9858 + version: 0.60.1.dev26+ge617b781e.d20260303 + sha256: 4f8327d7e74242a8e82fe839f22f10a6567272f48b4ddbe75f0cdcdd30b96dc4 requires_dist: - click>=7.0.0,<9.0.0 - colorama>=0.3.9,<1 @@ -3009,6 +3015,21 @@ packages: version: 0.16.0 sha256: 63cf8bbe7522de3bf65932fda1d9c2772064ffb3dae62d55932da54b31cb6c86 requires_python: '>=3.8' +- pypi: https://files.pythonhosted.org/packages/42/6e/8adaefff7e3e216b0f7bd6cafce6d5d06798f31c3e2852dc3db6a7d758c9/hiredis-2.4.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl + name: hiredis + version: 2.4.0 + sha256: 87a8ece3e893f45354395c6b9dc0479744c1c8c6ee4471b60945d96c9b5ce6c2 + requires_python: '>=3.8' +- pypi: https://files.pythonhosted.org/packages/d8/70/3f39ebfb3824578c34400df3b037b268abb5af0abaa789b430ffd17dd74e/hiredis-2.4.0-cp310-cp310-macosx_10_15_x86_64.whl + name: hiredis + version: 2.4.0 + sha256: 76503a0edaf3d1557518127511e69e5d9fa37b6ff15598b0d9d9c2db18b08a41 + requires_python: '>=3.8' +- pypi: https://files.pythonhosted.org/packages/ed/b7/26a56a3b991abe7fcf7bcfa8e0a08de3c3766c6caecb1ba46239342792ff/hiredis-2.4.0-cp310-cp310-macosx_11_0_arm64.whl + name: hiredis + version: 2.4.0 + sha256: b027b53adb1df11923753d85587e3ab611fe70bc69596e9eb3269acab809c376 + requires_python: '>=3.8' - pypi: https://files.pythonhosted.org/packages/7e/f5/f66802a942d491edb555dd61e3a9961140fd64c90bce1eafd741609d334d/httpcore-1.0.9-py3-none-any.whl name: httpcore version: 1.0.9 @@ -6364,6 +6385,19 @@ packages: purls: [] size: 313930 timestamp: 1765813902568 +- pypi: https://files.pythonhosted.org/packages/20/2e/409703d645363352a20c944f5d119bdae3eb3034051a53724a7c5fee12b8/redis-4.6.0-py3-none-any.whl + name: redis + version: 4.6.0 + sha256: e2b03db868160ee4591de3cb90d40ebb50a90dd302138775937f6a42b7ed183c + requires_dist: + - async-timeout>=4.0.2 ; python_full_version <= '3.11.2' + - importlib-metadata>=1.0 ; python_full_version < '3.8' + - typing-extensions ; python_full_version < '3.8' + - hiredis>=1.0.0 ; extra == 'hiredis' + - cryptography>=36.0.1 ; extra == 'ocsp' + - pyopenssl==20.0.1 ; extra == 'ocsp' + - requests>=2.26.0 ; extra == 'ocsp' + requires_python: '>=3.7' - pypi: https://files.pythonhosted.org/packages/2c/58/ca301544e1fa93ed4f80d724bf5b194f6e4b945841c5bfd555878eea9fcb/referencing-0.37.0-py3-none-any.whl name: referencing version: 0.37.0 diff --git a/pyproject.toml b/pyproject.toml index c2f691e3392..6607f05c677 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -293,7 +293,7 @@ test-compute = { cmd = "python -m pytest -v --integration sdk/python/tests/integ test = { depends-on = ["test-offline", "test-compute"] } [tool.pixi.feature.registration-tests.pypi-dependencies] -feast = { path = ".", editable = true, extras = ["aws", "gcp", "grpcio", "postgres", "mysql", "snowflake", "spark", "test"] } +feast = { path = ".", editable = true, extras = ["aws", "gcp", "grpcio", "postgres", "mysql", "redis", "snowflake", "spark", "test"] } grpcio-testing = ">=1.56.2,<=1.62.3" [tool.pixi.feature.registration-tests.tasks] diff --git a/sdk/python/feast/infra/offline_stores/bigquery.py b/sdk/python/feast/infra/offline_stores/bigquery.py index 2c4bc8cdbc3..fac3439b911 100644 --- a/sdk/python/feast/infra/offline_stores/bigquery.py +++ b/sdk/python/feast/infra/offline_stores/bigquery.py @@ -416,7 +416,7 @@ def offline_write_batch( ) if table.schema != pa_schema: - table = table.cast(pa_schema) + table = offline_utils.cast_arrow_table_to_schema(table, pa_schema) project_id = ( config.offline_store.billing_project_id or config.offline_store.project_id ) @@ -833,12 +833,17 @@ def arrow_schema_to_bq_schema(arrow_schema: pyarrow.Schema) -> List[SchemaField] bq_schema = [] for field in arrow_schema: - if pyarrow.types.is_list(field.type): + if pyarrow.types.is_struct(field.type) or pyarrow.types.is_map(field.type): + detected_mode = "NULLABLE" + detected_type = "STRING" + elif pyarrow.types.is_list(field.type): detected_mode = "REPEATED" - detected_type = _ARROW_SCALAR_IDS_TO_BQ[field.type.value_type.id] + detected_type = _ARROW_SCALAR_IDS_TO_BQ.get( + field.type.value_type.id, "STRING" + ) else: detected_mode = "NULLABLE" - detected_type = _ARROW_SCALAR_IDS_TO_BQ[field.type.id] + detected_type = _ARROW_SCALAR_IDS_TO_BQ.get(field.type.id, "STRING") bq_schema.append( SchemaField(name=field.name, field_type=detected_type, mode=detected_mode) diff --git a/sdk/python/feast/infra/offline_stores/offline_utils.py b/sdk/python/feast/infra/offline_stores/offline_utils.py index abd7ad4fe35..9825453d6a6 100644 --- a/sdk/python/feast/infra/offline_stores/offline_utils.py +++ b/sdk/python/feast/infra/offline_stores/offline_utils.py @@ -260,6 +260,37 @@ def get_pyarrow_schema_from_batch_source( return pa.schema(pa_schema), column_names +def cast_arrow_table_to_schema(table: pa.Table, pa_schema: pa.Schema) -> pa.Table: + """Cast a PyArrow table to match the target schema, handling struct/map → string. + + PyArrow cannot natively cast struct or map columns to string. When a + SQL-based offline store (BigQuery, Snowflake, Redshift) stores complex + Feast types (Map, Struct) as VARCHAR/STRING, the target schema will have + string fields while the input table may have struct/map fields (e.g. when + the caller provides Python dicts). This function serialises those columns + to JSON strings so the subsequent cast succeeds. + """ + import json as _json + + for i, field in enumerate(table.schema): + target_type = pa_schema.field(field.name).type + is_complex_source = pa.types.is_struct(field.type) or pa.types.is_map( + field.type + ) + is_string_target = pa.types.is_string(target_type) or pa.types.is_large_string( + target_type + ) + if is_complex_source and is_string_target: + col = table.column(i) + json_arr = pa.array( + [_json.dumps(v.as_py()) if v.is_valid else None for v in col], + type=target_type, + ) + table = table.set_column(i, field.name, json_arr) + + return table.cast(pa_schema) + + def enclose_in_backticks(value): # Check if the input is a list if isinstance(value, list): diff --git a/sdk/python/feast/infra/offline_stores/redshift.py b/sdk/python/feast/infra/offline_stores/redshift.py index 4ed8e6309c4..900dfcfab80 100644 --- a/sdk/python/feast/infra/offline_stores/redshift.py +++ b/sdk/python/feast/infra/offline_stores/redshift.py @@ -353,7 +353,7 @@ def offline_write_batch( ) if table.schema != pa_schema: - table = table.cast(pa_schema) + table = offline_utils.cast_arrow_table_to_schema(table, pa_schema) redshift_options = feature_view.batch_source.redshift_options redshift_client = aws_utils.get_redshift_data_client( diff --git a/sdk/python/feast/infra/offline_stores/snowflake.py b/sdk/python/feast/infra/offline_stores/snowflake.py index 1140d8562b2..7226c908d13 100644 --- a/sdk/python/feast/infra/offline_stores/snowflake.py +++ b/sdk/python/feast/infra/offline_stores/snowflake.py @@ -409,7 +409,7 @@ def offline_write_batch( ) if table.schema != pa_schema: - table = table.cast(pa_schema) + table = offline_utils.cast_arrow_table_to_schema(table, pa_schema) with GetSnowflakeConnection(config.offline_store) as conn: snowflake_conn = conn diff --git a/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py b/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py index 86c2aaad0e6..fb812f82b7b 100644 --- a/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py +++ b/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py @@ -87,6 +87,10 @@ FEAST_PRIMITIVE_TO_MILVUS_TYPE_MAPPING[feast_type] = DataType.VARCHAR elif base_value_type == ValueType.BOOL: FEAST_PRIMITIVE_TO_MILVUS_TYPE_MAPPING[feast_type] = DataType.BINARY_VECTOR + elif isinstance(feast_type, ComplexFeastType): + milvus_type = PROTO_TO_MILVUS_TYPE_MAPPING.get(value_type) + if milvus_type: + FEAST_PRIMITIVE_TO_MILVUS_TYPE_MAPPING[feast_type] = milvus_type class MilvusOnlineStoreConfig(FeastConfigBaseModel, VectorStoreConfig): @@ -179,6 +183,8 @@ def _get_or_create_collection( fields_to_add = [f for f in table.schema if f.name not in fields_to_exclude] for field in fields_to_add: dtype = FEAST_PRIMITIVE_TO_MILVUS_TYPE_MAPPING.get(field.dtype) + if dtype is None and isinstance(field.dtype, ComplexFeastType): + dtype = DataType.VARCHAR if dtype: if dtype == DataType.FLOAT_VECTOR: fields.append( diff --git a/sdk/python/feast/infra/online_stores/remote.py b/sdk/python/feast/infra/online_stores/remote.py index 2670ee4887c..5b5b04c362d 100644 --- a/sdk/python/feast/infra/online_stores/remote.py +++ b/sdk/python/feast/infra/online_stores/remote.py @@ -70,6 +70,41 @@ class RemoteOnlineStore(OnlineStore): remote online store implementation wrapper to communicate with feast online server. """ + @staticmethod + def _proto_value_to_transport_value(proto_value: ValueProto) -> Any: + """ + Convert a proto Value to a JSON-serializable Python value suitable for + HTTP transport. Unlike ``feast_value_type_to_python_type``, this keeps + ``json_val`` as a raw string so the receiving server can reconstruct a + DataFrame whose column types match the original (string for JSON, dict + for Map/Struct). Parsing JSON strings into dicts would cause PyArrow to + infer a struct column on the server, which can crash with complex nested + types (lists inside dicts). + """ + val_attr = proto_value.WhichOneof("val") + if val_attr is None: + return None + + # Keep JSON values as raw strings for correct DataFrame reconstruction. + # Parsing them into dicts causes PyArrow to infer struct columns on the + # server whose nested lists round-trip as numpy arrays, breaking + # json.dumps during proto conversion. + if val_attr == "json_val": + return getattr(proto_value, val_attr) + if val_attr == "json_list_val": + return list(getattr(proto_value, val_attr).val) + + # Map/Struct types are converted to Python dicts by + # feast_value_type_to_python_type. Serialise them to JSON strings + # so the server-side DataFrame gets VARCHAR columns instead of + # PyArrow struct columns that can crash with complex nested types. + if val_attr in ("map_val", "struct_val"): + return json.dumps(feast_value_type_to_python_type(proto_value)) + if val_attr in ("map_list_val", "struct_list_val"): + return [json.dumps(v) for v in feast_value_type_to_python_type(proto_value)] + + return feast_value_type_to_python_type(proto_value) + def online_write_batch( self, config: RepoConfig, @@ -97,10 +132,11 @@ def online_write_batch( feast_value_type_to_python_type(entity_value_proto) ) - # Populate feature values + # Populate feature values – use transport-safe conversion that + # preserves JSON strings instead of parsing them into dicts. for feature_name, feature_value_proto in feature_values_proto.items(): columnar_data[feature_name].append( - feast_value_type_to_python_type(feature_value_proto) + self._proto_value_to_transport_value(feature_value_proto) ) # Populate timestamps diff --git a/sdk/python/tests/conftest.py b/sdk/python/tests/conftest.py index 2200cf1dcd1..beb3083f97c 100644 --- a/sdk/python/tests/conftest.py +++ b/sdk/python/tests/conftest.py @@ -435,6 +435,9 @@ def fake_ingest_data(): "conv_rate": [0.5], "acc_rate": [0.6], "avg_daily_trips": [4], + "driver_metadata": [None], + "driver_config": [None], + "driver_profile": [None], "event_timestamp": [pd.Timestamp(_utc_now()).round("ms")], "created": [pd.Timestamp(_utc_now()).round("ms")], } diff --git a/sdk/python/tests/integration/feature_repos/universal/data_source_creator.py b/sdk/python/tests/integration/feature_repos/universal/data_source_creator.py index 467db4dddce..6c0bc39b353 100644 --- a/sdk/python/tests/integration/feature_repos/universal/data_source_creator.py +++ b/sdk/python/tests/integration/feature_repos/universal/data_source_creator.py @@ -1,3 +1,4 @@ +import json from abc import ABC, abstractmethod from typing import Dict, Optional @@ -14,6 +15,29 @@ class DataSourceCreator(ABC): def __init__(self, project_name: str, *args, **kwargs): self.project_name = project_name + @staticmethod + def serialize_complex_columns(df: pd.DataFrame) -> pd.DataFrame: + """Serialize dict columns (Map/Struct types) to JSON strings. + + Backends like Snowflake, BigQuery, and Redshift cannot natively + ingest Python dicts via their bulk-load paths (VARIANT, STRUCT, + super types cause issues). Converting them to JSON strings lets + the data be stored as VARCHAR/STRING instead. + + List columns with primitive values (int, float, str, bool) are + left untouched since backends handle those as native ARRAY types. + """ + df = df.copy() + for col in df.columns: + if df[col].dropna().empty: + continue + sample = df[col].dropna().iloc[0] + if isinstance(sample, dict): + df[col] = df[col].apply( + lambda v: json.dumps(v) if v is not None else None + ) + return df + @abstractmethod def create_data_source( self, diff --git a/sdk/python/tests/integration/feature_repos/universal/data_sources/bigquery.py b/sdk/python/tests/integration/feature_repos/universal/data_sources/bigquery.py index 4fcd9533e8e..c3e4e9bfa29 100644 --- a/sdk/python/tests/integration/feature_repos/universal/data_sources/bigquery.py +++ b/sdk/python/tests/integration/feature_repos/universal/data_sources/bigquery.py @@ -80,6 +80,7 @@ def create_data_source( # `BigQueryOfflineStore.offline_write_batch`, but since we're bypassing that API here, we should follow the same # rule. The schema of this initial dataframe determines the schema in the newly created BigQuery table. df = make_df_tzaware(df) + df = self.serialize_complex_columns(df) job = self.client.load_table_from_dataframe(df, destination_name) job.result() diff --git a/sdk/python/tests/integration/feature_repos/universal/data_sources/redshift.py b/sdk/python/tests/integration/feature_repos/universal/data_sources/redshift.py index 91d1a74f071..386fdf1d30d 100644 --- a/sdk/python/tests/integration/feature_repos/universal/data_sources/redshift.py +++ b/sdk/python/tests/integration/feature_repos/universal/data_sources/redshift.py @@ -55,6 +55,7 @@ def create_data_source( ) -> DataSource: destination_name = self.get_prefixed_table_name(destination_name) + df = self.serialize_complex_columns(df) aws_utils.upload_df_to_redshift( self.client, self.offline_store_config.cluster_id, diff --git a/sdk/python/tests/integration/feature_repos/universal/data_sources/snowflake.py b/sdk/python/tests/integration/feature_repos/universal/data_sources/snowflake.py index e9c4ad21a31..b6d611d5209 100644 --- a/sdk/python/tests/integration/feature_repos/universal/data_sources/snowflake.py +++ b/sdk/python/tests/integration/feature_repos/universal/data_sources/snowflake.py @@ -53,6 +53,7 @@ def create_data_source( ) -> DataSource: destination_name = self.get_prefixed_table_name(destination_name) + df = self.serialize_complex_columns(df) with GetSnowflakeConnection(self.offline_store_config) as conn: write_pandas(conn, df, destination_name, auto_create_table=True)