Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .github/workflows/dbt-integration-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ on:
- 'sdk/python/tests/unit/dbt/**'
- '.github/workflows/dbt-integration-tests.yml'

concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true

jobs:
dbt-integration-test:
if:
Expand Down
4 changes: 4 additions & 0 deletions .github/workflows/lint_pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ on:
- edited
- synchronize

concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true

jobs:
validate-title:
if:
Expand Down
4 changes: 4 additions & 0 deletions .github/workflows/linter.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ name: linter

on: [push, pull_request]

concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true

jobs:
lint-python:
runs-on: [ubuntu-latest]
Expand Down
4 changes: 4 additions & 0 deletions .github/workflows/operator-e2e-integration-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ on:
paths:
- 'infra/**'

concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true

jobs:
operator-e2e-tests:
timeout-minutes: 40
Expand Down
5 changes: 5 additions & 0 deletions .github/workflows/operator_pr.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
name: operator-pr

on: [pull_request]

concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true

jobs:
operator-test:
runs-on: ubuntu-latest
Expand Down
4 changes: 4 additions & 0 deletions .github/workflows/pr_duckdb_integration_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ on:
- synchronize
- labeled

concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true

jobs:
integration-test-duckdb-offline:
if:
Expand Down
8 changes: 4 additions & 4 deletions .github/workflows/pr_integration_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ on:
- synchronize
- labeled

# concurrency is currently broken, see details https://github.com/actions/runner/issues/1532
#concurrency:
# group: pr-integration-tests-${{ github.event.pull_request.number }}
# cancel-in-progress: true
concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true

permissions:
actions: write
pull-requests: read
Expand Down
4 changes: 4 additions & 0 deletions .github/workflows/pr_local_integration_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ on:
- synchronize
- labeled

concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true

jobs:
integration-test-python-local:
if:
Expand Down
4 changes: 4 additions & 0 deletions .github/workflows/pr_ray_integration_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ on:
- synchronize
- labeled

concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true

jobs:
integration-test-ray:
if:
Expand Down
14 changes: 14 additions & 0 deletions .github/workflows/pr_registration_integration_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ on:
- synchronize
- labeled

concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true

permissions:
actions: write
pull-requests: read
Expand Down Expand Up @@ -39,6 +43,16 @@ jobs:
(github.event.action != 'labeled' && (contains(github.event.pull_request.labels.*.name, 'ok-to-test') || contains(github.event.pull_request.labels.*.name, 'approved') || contains(github.event.pull_request.labels.*.name, 'lgtm')))) &&
github.repository == 'feast-dev/feast'
runs-on: ubuntu-latest
services:
redis:
image: redis
ports:
- 6379:6379
options: >-
--health-cmd "redis-cli ping"
--health-interval 10s
--health-timeout 5s
--health-retries 5
steps:
- uses: actions/checkout@v4
with:
Expand Down
4 changes: 4 additions & 0 deletions .github/workflows/pr_remote_rbac_integration_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ on:
- synchronize
- labeled

concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true

jobs:
remote-rbac-integration-tests-python:
if:
Expand Down
4 changes: 4 additions & 0 deletions .github/workflows/registry-rest-api-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ on:
- synchronize
- labeled

concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true

jobs:
registry-rest-api-tests:
timeout-minutes: 30
Expand Down
4 changes: 4 additions & 0 deletions .github/workflows/security.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ on:
schedule:
- cron: "0 6 * * 1"

concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true

jobs:
codeql:
name: CodeQL Analysis
Expand Down
4 changes: 4 additions & 0 deletions .github/workflows/smoke_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ name: smoke-tests
on:
pull_request:

concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true

jobs:
smoke-test-python:
runs-on: ${{ matrix.os }}
Expand Down
4 changes: 4 additions & 0 deletions .github/workflows/unit_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ on:
branches:
- master

concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true

jobs:
unit-test-python:
runs-on: ${{ matrix.os }}
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ test-python-smoke: ## Quick smoke test for development
test-python-integration: ## Run Python integration tests (CI)
uv run python -m pytest --tb=short -v -n 8 --integration --color=yes --durations=10 --timeout=1200 --timeout_method=thread --dist loadgroup \
-k "(not snowflake or not test_historical_features_main)" \
-m "not rbac_remote_integration_test" \
-m "not rbac_remote_integration_test and not ray_offline_stores_only" \
--ignore=sdk/python/tests/integration/registration \
--ignore=sdk/python/tests/integration/compute_engines/ray_compute \
--log-cli-level=INFO -s \
Expand Down
38 changes: 36 additions & 2 deletions pixi.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ test-compute = { cmd = "python -m pytest -v --integration sdk/python/tests/integ
test = { depends-on = ["test-offline", "test-compute"] }

[tool.pixi.feature.registration-tests.pypi-dependencies]
feast = { path = ".", editable = true, extras = ["aws", "gcp", "grpcio", "postgres", "mysql", "snowflake", "spark", "test"] }
feast = { path = ".", editable = true, extras = ["aws", "gcp", "grpcio", "postgres", "mysql", "redis", "snowflake", "spark", "test"] }
grpcio-testing = ">=1.56.2,<=1.62.3"

[tool.pixi.feature.registration-tests.tasks]
Expand Down
13 changes: 9 additions & 4 deletions sdk/python/feast/infra/offline_stores/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ def offline_write_batch(
)

if table.schema != pa_schema:
table = table.cast(pa_schema)
table = offline_utils.cast_arrow_table_to_schema(table, pa_schema)
project_id = (
config.offline_store.billing_project_id or config.offline_store.project_id
)
Expand Down Expand Up @@ -833,12 +833,17 @@ def arrow_schema_to_bq_schema(arrow_schema: pyarrow.Schema) -> List[SchemaField]
bq_schema = []

for field in arrow_schema:
if pyarrow.types.is_list(field.type):
if pyarrow.types.is_struct(field.type) or pyarrow.types.is_map(field.type):
detected_mode = "NULLABLE"
detected_type = "STRING"
elif pyarrow.types.is_list(field.type):
detected_mode = "REPEATED"
detected_type = _ARROW_SCALAR_IDS_TO_BQ[field.type.value_type.id]
detected_type = _ARROW_SCALAR_IDS_TO_BQ.get(
field.type.value_type.id, "STRING"
)
else:
detected_mode = "NULLABLE"
detected_type = _ARROW_SCALAR_IDS_TO_BQ[field.type.id]
detected_type = _ARROW_SCALAR_IDS_TO_BQ.get(field.type.id, "STRING")

bq_schema.append(
SchemaField(name=field.name, field_type=detected_type, mode=detected_mode)
Expand Down
31 changes: 31 additions & 0 deletions sdk/python/feast/infra/offline_stores/offline_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,37 @@ def get_pyarrow_schema_from_batch_source(
return pa.schema(pa_schema), column_names


def cast_arrow_table_to_schema(table: pa.Table, pa_schema: pa.Schema) -> pa.Table:
"""Cast a PyArrow table to match the target schema, handling struct/map → string.

PyArrow cannot natively cast struct or map columns to string. When a
SQL-based offline store (BigQuery, Snowflake, Redshift) stores complex
Feast types (Map, Struct) as VARCHAR/STRING, the target schema will have
string fields while the input table may have struct/map fields (e.g. when
the caller provides Python dicts). This function serialises those columns
to JSON strings so the subsequent cast succeeds.
"""
import json as _json

for i, field in enumerate(table.schema):
target_type = pa_schema.field(field.name).type
is_complex_source = pa.types.is_struct(field.type) or pa.types.is_map(
field.type
)
is_string_target = pa.types.is_string(target_type) or pa.types.is_large_string(
target_type
)
if is_complex_source and is_string_target:
col = table.column(i)
json_arr = pa.array(
[_json.dumps(v.as_py()) if v.is_valid else None for v in col],
type=target_type,
)
table = table.set_column(i, field.name, json_arr)

return table.cast(pa_schema)


def enclose_in_backticks(value):
# Check if the input is a list
if isinstance(value, list):
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/feast/infra/offline_stores/redshift.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ def offline_write_batch(
)

if table.schema != pa_schema:
table = table.cast(pa_schema)
table = offline_utils.cast_arrow_table_to_schema(table, pa_schema)

redshift_options = feature_view.batch_source.redshift_options
redshift_client = aws_utils.get_redshift_data_client(
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/feast/infra/offline_stores/snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ def offline_write_batch(
)

if table.schema != pa_schema:
table = table.cast(pa_schema)
table = offline_utils.cast_arrow_table_to_schema(table, pa_schema)

with GetSnowflakeConnection(config.offline_store) as conn:
snowflake_conn = conn
Expand Down
Loading
Loading