diff --git a/.github/workflows/master_only.yml b/.github/workflows/master_only.yml index 51e3830fe6d..13cb341c616 100644 --- a/.github/workflows/master_only.yml +++ b/.github/workflows/master_only.yml @@ -158,15 +158,6 @@ jobs: SNOWFLAKE_CI_ROLE: ${{ secrets.SNOWFLAKE_CI_ROLE }} SNOWFLAKE_CI_WAREHOUSE: ${{ secrets.SNOWFLAKE_CI_WAREHOUSE }} run: pytest -n 8 --cov=./ --cov-report=xml --color=yes sdk/python/tests --integration --durations=5 --timeout=1200 --timeout_method=thread - - name: Upload coverage to Codecov - uses: codecov/codecov-action@v1 - with: - token: ${{ secrets.CODECOV_TOKEN }} - files: ./coverage.xml - flags: integrationtests - env_vars: OS,PYTHON - fail_ci_if_error: true - verbose: true - name: Benchmark python env: FEAST_SERVER_DOCKER_IMAGE_TAG: ${{ needs.build-lambda-docker-image.outputs.DOCKER_IMAGE_TAG }} diff --git a/.github/workflows/nightly-ci.yml b/.github/workflows/nightly-ci.yml index e1370b10b12..8eeb2f0e804 100644 --- a/.github/workflows/nightly-ci.yml +++ b/.github/workflows/nightly-ci.yml @@ -208,13 +208,4 @@ jobs: SNOWFLAKE_CI_PASSWORD: ${{ secrets.SNOWFLAKE_CI_PASSWORD }} SNOWFLAKE_CI_ROLE: ${{ secrets.SNOWFLAKE_CI_ROLE }} SNOWFLAKE_CI_WAREHOUSE: ${{ secrets.SNOWFLAKE_CI_WAREHOUSE }} - run: pytest -n 8 --cov=./ --cov-report=xml --color=yes sdk/python/tests --integration --durations=5 --timeout=1200 --timeout_method=thread - - name: Upload coverage to Codecov - uses: codecov/codecov-action@v1 - with: - token: ${{ secrets.CODECOV_TOKEN }} - files: ./coverage.xml - flags: integrationtests - env_vars: OS,PYTHON - fail_ci_if_error: true - verbose: true \ No newline at end of file + run: pytest -n 8 --cov=./ --cov-report=xml --color=yes sdk/python/tests --integration --durations=5 --timeout=1200 --timeout_method=thread \ No newline at end of file diff --git a/.github/workflows/pr_integration_tests.yml b/.github/workflows/pr_integration_tests.yml index ab8a79760f2..7c92c293a60 100644 --- a/.github/workflows/pr_integration_tests.yml +++ b/.github/workflows/pr_integration_tests.yml @@ -182,13 +182,4 @@ jobs: SNOWFLAKE_CI_PASSWORD: ${{ secrets.SNOWFLAKE_CI_PASSWORD }} SNOWFLAKE_CI_ROLE: ${{ secrets.SNOWFLAKE_CI_ROLE }} SNOWFLAKE_CI_WAREHOUSE: ${{ secrets.SNOWFLAKE_CI_WAREHOUSE }} - run: pytest -n 8 --cov=./ --cov-report=xml --color=yes sdk/python/tests --integration --durations=5 --timeout=1200 --timeout_method=thread - - name: Upload coverage to Codecov - uses: codecov/codecov-action@v1 - with: - token: ${{ secrets.CODECOV_TOKEN }} - files: ./coverage.xml - flags: integrationtests - env_vars: OS,PYTHON - fail_ci_if_error: true - verbose: true \ No newline at end of file + run: pytest -n 8 --cov=./ --cov-report=xml --color=yes sdk/python/tests --integration --durations=5 --timeout=1200 --timeout_method=thread \ No newline at end of file diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 0c9ac1e752c..db62766a1c7 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -42,10 +42,53 @@ jobs: echo "Current version is ${CURRENT_VERSION}" echo "Next version is ${NEXT_VERSION}" - publish-web-ui-npm: + validate_version_bumps: if: github.repository == 'feast-dev/feast' needs: get_dry_release_versions runs-on: ubuntu-latest + env: + # This publish is working using an NPM automation token to bypass 2FA + NPM_TOKEN: ${{ secrets.NPM_TOKEN }} + HELM_VERSION: v3.8.0 + CURRENT_VERSION: ${{ needs.get_dry_release_versions.outputs.current_version }} + NEXT_VERSION: ${{ needs.get_dry_release_versions.outputs.next_version }} + steps: + - uses: actions/checkout@v2 + - uses: actions/setup-node@v2 + with: + node-version: '17.x' + registry-url: 'https://registry.npmjs.org' + - name: Bump file versions + run: python ./infra/scripts/release/bump_file_versions.py ${CURRENT_VERSION} ${NEXT_VERSION} + - name: Install yarn dependencies + working-directory: ./ui + run: yarn install + - name: Build yarn rollup + working-directory: ./ui + run: yarn build:lib + - name: Bundle UI in SDK + run: make build-ui + - name: Remove previous Helm + run: sudo rm -rf $(which helm) + - name: Set up Homebrew + uses: Homebrew/actions/setup-homebrew@master + - name: Setup Helm-docs + run: | + brew install norwoodj/tap/helm-docs + - name: Generate helm chart READMEs + run: make build-helm-docs + - name: Install Helm + run: ./infra/scripts/helm/install-helm.sh + - name: Validate Helm chart prior to publishing + run: ./infra/scripts/helm/validate-helm-chart-publish.sh + - name: Validate all version consistency + run: ./infra/scripts/helm/validate-helm-chart-versions.sh $NEXT_VERSION + + + publish-web-ui-npm: + if: github.repository == 'feast-dev/feast' + needs: [validate_version_bumps, get_dry_release_versions] + runs-on: ubuntu-latest env: # This publish is working using an NPM automation token to bypass 2FA NPM_TOKEN: ${{ secrets.NPM_TOKEN }} diff --git a/.github/workflows/unit_tests.yml b/.github/workflows/unit_tests.yml index ebc09f0080e..ea8bef2e2fa 100644 --- a/.github/workflows/unit_tests.yml +++ b/.github/workflows/unit_tests.yml @@ -70,16 +70,6 @@ jobs: run: make install-python-ci-dependencies - name: Test Python run: pytest -n 8 --cov=./ --cov-report=xml --color=yes sdk/python/tests - - name: Upload coverage to Codecov - uses: codecov/codecov-action@v1 - if: github.repository == 'feast-dev/feast' - with: - token: ${{ secrets.CODECOV_TOKEN }} - files: ./coverage.xml - flags: unittests - env_vars: OS,PYTHON - fail_ci_if_error: true - verbose: true unit-test-go: runs-on: ubuntu-latest diff --git a/.gitignore b/.gitignore index 6a86eb2682b..e4e82bfce46 100644 --- a/.gitignore +++ b/.gitignore @@ -223,4 +223,4 @@ ui/.vercel **/yarn-error.log* # Go subprocess binaries (built during feast pip package building) -sdk/python/feast/binaries/ +sdk/python/feast/binaries/ \ No newline at end of file diff --git a/CHANGELOG.md b/CHANGELOG.md index 65aa693691e..0b2f585587a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,28 @@ # Changelog +# [0.27.0](https://github.com/feast-dev/feast/compare/v0.26.0...v0.27.0) (2022-12-05) + + +### Bug Fixes + +* Changing Snowflake template code to avoid query not implemented … ([#3319](https://github.com/feast-dev/feast/issues/3319)) ([1590d6b](https://github.com/feast-dev/feast/commit/1590d6be14b9a9cea6fbde60f2373cb3cd68acb9)) +* Dask zero division error if parquet dataset has only one partition ([#3236](https://github.com/feast-dev/feast/issues/3236)) ([69e4a7d](https://github.com/feast-dev/feast/commit/69e4a7d10d912cd89f1e0c2a024d07cf2f44211e)) +* Enable Spark materialization on Yarn ([#3370](https://github.com/feast-dev/feast/issues/3370)) ([0c20a4e](https://github.com/feast-dev/feast/commit/0c20a4e6ac95d7c28299a95d91024be4370ef7ed)) +* Ensure that Snowflake accounts for number columns that overspecify precision ([#3306](https://github.com/feast-dev/feast/issues/3306)) ([0ad0ace](https://github.com/feast-dev/feast/commit/0ad0ace893edbd0680ee100726ad38bec1436974)) +* Fix memory leak from usage.py not properly cleaning up call stack ([#3371](https://github.com/feast-dev/feast/issues/3371)) ([a0c6fde](https://github.com/feast-dev/feast/commit/a0c6fde93bc8088d2bb34d1dd366a44b5d2657ee)) +* Fix workflow to contain env vars ([#3379](https://github.com/feast-dev/feast/issues/3379)) ([548bed9](https://github.com/feast-dev/feast/commit/548bed988687dc94147f494d4dbb4147fa5bea8e)) +* Update bytewax materialization ([#3368](https://github.com/feast-dev/feast/issues/3368)) ([4ebe00f](https://github.com/feast-dev/feast/commit/4ebe00fefa337cbe96ad3c712f44b9c9d7a46ef2)) +* Update the version counts ([#3378](https://github.com/feast-dev/feast/issues/3378)) ([8112db5](https://github.com/feast-dev/feast/commit/8112db5b5a18123da567c310fe385ae907edb56d)) +* Updated AWS Athena template ([#3322](https://github.com/feast-dev/feast/issues/3322)) ([5956981](https://github.com/feast-dev/feast/commit/595698105637aaeb952fddc2957c83e501964d2a)) +* Wrong UI data source type display ([#3276](https://github.com/feast-dev/feast/issues/3276)) ([8f28062](https://github.com/feast-dev/feast/commit/8f280620bceb3a6e42ffffd0571eeb353b0feff2)) + + +### Features + +* Cassandra online store, concurrency in bulk write operations ([#3367](https://github.com/feast-dev/feast/issues/3367)) ([eaf354c](https://github.com/feast-dev/feast/commit/eaf354cde27c8f3a46ac256a71ad5ec06dfd70b3)) +* Cassandra online store, concurrent fetching for multiple entities ([#3356](https://github.com/feast-dev/feast/issues/3356)) ([00fa21f](https://github.com/feast-dev/feast/commit/00fa21f9ebef496a6801974cfd3a0a0b4861a11e)) +* Get Snowflake Query Output As Pyspark Dataframe ([#2504](https://github.com/feast-dev/feast/issues/2504)) ([#3358](https://github.com/feast-dev/feast/issues/3358)) ([2f18957](https://github.com/feast-dev/feast/commit/2f189572493cb498243bf7d6ec12d768f4b90ee7)) + # [0.26.0](https://github.com/feast-dev/feast/compare/v0.25.0...v0.26.0) (2022-10-06) diff --git a/Makefile b/Makefile index 718df6af4ab..8d9a1a8d3b3 100644 --- a/Makefile +++ b/Makefile @@ -461,7 +461,6 @@ build-templates: build-helm-docs: cd ${ROOT_DIR}/infra/charts/feast; helm-docs cd ${ROOT_DIR}/infra/charts/feast-feature-server; helm-docs - cd ${ROOT_DIR}/infra/charts/feast-python-server; helm-docs # Web UI diff --git a/README.md b/README.md index d57247c60b5..1152aa060e4 100644 --- a/README.md +++ b/README.md @@ -27,10 +27,10 @@ Feast allows ML platform teams to: * **Avoid data leakage** by generating point-in-time correct feature sets so data scientists can focus on feature engineering rather than debugging error-prone dataset joining logic. This ensure that future feature values do not leak to models during training. * **Decouple ML from data infrastructure** by providing a single data access layer that abstracts feature storage from feature retrieval, ensuring models remain portable as you move from training models to serving models, from batch models to realtime models, and from one data infra system to another. -Please see our [documentation](https://docs.feast.dev/) for more information about the project. +Please see our [documentation](https://docs.feast.dev/) for more information about the project, or sign up for an [email newsletter](https://feast.dev/). ## 📐 Architecture -![](docs/assets/feast-marchitecture.png) +![](docs/assets/feast_marchitecture.png) The above architecture is the minimal Feast deployment. Want to run the full Feast on Snowflake/GCP/AWS? Click [here](https://docs.feast.dev/how-to-guides/feast-snowflake-gcp-aws). @@ -44,7 +44,7 @@ pip install feast ### 2. Create a feature repository ```commandline feast init my_feature_repo -cd my_feature_repo +cd my_feature_repo/feature_repo ``` ### 3. Register your feature definitions and set up your feature store @@ -167,7 +167,7 @@ The list below contains the functionality that contributors are planning to deve * [x] [Trino (contrib plugin)](https://github.com/Shopify/feast-trino) * [x] [Spark (contrib plugin)](https://docs.feast.dev/reference/offline-stores/spark) * [x] [In-memory / Pandas](https://docs.feast.dev/reference/offline-stores/file) - * [x] [Custom offline store support](https://docs.feast.dev/how-to-guides/adding-a-new-offline-store) + * [x] [Custom offline store support](https://docs.feast.dev/how-to-guides/customizing-feast/adding-a-new-offline-store) * **Online Stores** * [x] [Snowflake](https://docs.feast.dev/reference/online-stores/snowflake) * [x] [DynamoDB](https://docs.feast.dev/reference/online-stores/dynamodb) @@ -178,18 +178,18 @@ The list below contains the functionality that contributors are planning to deve * [x] [Azure Cache for Redis (community plugin)](https://github.com/Azure/feast-azure) * [x] [Postgres (contrib plugin)](https://docs.feast.dev/reference/online-stores/postgres) * [x] [Cassandra / AstraDB (contrib plugin)](https://docs.feast.dev/reference/online-stores/cassandra) - * [x] [Custom online store support](https://docs.feast.dev/how-to-guides/adding-support-for-a-new-online-store) + * [x] [Custom online store support](https://docs.feast.dev/how-to-guides/customizing-feast/adding-support-for-a-new-online-store) * **Feature Engineering** * [x] On-demand Transformations (Alpha release. See [RFC](https://docs.google.com/document/d/1lgfIw0Drc65LpaxbUu49RCeJgMew547meSJttnUqz7c/edit#)) * [x] Streaming Transformations (Alpha release. See [RFC](https://docs.google.com/document/d/1UzEyETHUaGpn0ap4G82DHluiCj7zEbrQLkJJkKSv4e8/edit)) * [ ] Batch transformation (In progress. See [RFC](https://docs.google.com/document/d/1964OkzuBljifDvkV-0fakp2uaijnVzdwWNGdz7Vz50A/edit)) * **Streaming** - * [x] [Custom streaming ingestion job support](https://docs.feast.dev/how-to-guides/creating-a-custom-provider) + * [x] [Custom streaming ingestion job support](https://docs.feast.dev/how-to-guides/customizing-feast/creating-a-custom-provider) * [x] [Push based streaming data ingestion to online store](https://docs.feast.dev/reference/data-sources/push) * [x] [Push based streaming data ingestion to offline store](https://docs.feast.dev/reference/data-sources/push) * **Deployments** * [x] AWS Lambda (Alpha release. See [RFC](https://docs.google.com/document/d/1eZWKWzfBif66LDN32IajpaG-j82LSHCCOzY6R7Ax7MI/edit)) - * [x] Kubernetes (See [guide](https://docs.feast.dev/how-to-guides/running-feast-in-production#4.3.-java-based-feature-server-deployed-on-kubernetes)) + * [x] Kubernetes (See [guide](https://docs.feast.dev/how-to-guides/running-feast-in-production)) * **Feature Serving** * [x] Python Client * [x] [Python feature server](https://docs.feast.dev/reference/feature-servers/python-feature-server) diff --git a/docs/README.md b/docs/README.md index f387406c3fd..a305c4aecde 100644 --- a/docs/README.md +++ b/docs/README.md @@ -14,7 +14,7 @@ Feast allows ML platform teams to: **Note:** Feast today primarily addresses _timestamped structured data_. {% endhint %} -![](assets/feast-marchitecture.png) +![](assets/feast_marchitecture.png) ## Who is Feast for? diff --git a/docs/SUMMARY.md b/docs/SUMMARY.md index cb4a2664cd6..1bab8a61ef2 100644 --- a/docs/SUMMARY.md +++ b/docs/SUMMARY.md @@ -104,6 +104,8 @@ * [Batch Materialization Engines](reference/batch-materialization/README.md) * [Bytewax](reference/batch-materialization/bytewax.md) * [Snowflake](reference/batch-materialization/snowflake.md) + * [AWS Lambda (alpha)](reference/batch-materialization/lambda.md) + * [Spark (contrib)](reference/batch-materialization/spark.md) * [Feature repository](reference/feature-repository/README.md) * [feature\_store.yaml](reference/feature-repository/feature-store-yaml.md) * [.feastignore](reference/feature-repository/feast-ignore.md) diff --git a/docs/assets/feast-marchitecture.png b/docs/assets/feast-marchitecture.png deleted file mode 100644 index 0a7b044b098..00000000000 Binary files a/docs/assets/feast-marchitecture.png and /dev/null differ diff --git a/docs/assets/feast_marchitecture.png b/docs/assets/feast_marchitecture.png new file mode 100644 index 00000000000..2b162ec06e9 Binary files /dev/null and b/docs/assets/feast_marchitecture.png differ diff --git a/docs/getting-started/quickstart.md b/docs/getting-started/quickstart.md index 57b007707a3..b30bdb585c7 100644 --- a/docs/getting-started/quickstart.md +++ b/docs/getting-started/quickstart.md @@ -555,6 +555,7 @@ show up in the upcoming concepts + architecture + tutorial pages as well. ## Next steps +* Join the [email newsletter](https://feast.dev/) to get new updates on Feast / feature stores. * Read the [Concepts](concepts/) page to understand the Feast data model. * Read the [Architecture](architecture-and-components/) page. * Check out our [Tutorials](../tutorials/tutorials-overview/) section for more examples on how to use Feast. diff --git a/docs/how-to-guides/running-feast-in-production.md b/docs/how-to-guides/running-feast-in-production.md index ef903b68c4b..9d1984d7366 100644 --- a/docs/how-to-guides/running-feast-in-production.md +++ b/docs/how-to-guides/running-feast-in-production.md @@ -34,6 +34,8 @@ The first step to setting up a deployment of Feast is to create a Git repository Out of the box, Feast serializes all of its state into a file-based registry. When running Feast in production, we recommend using the more scalable SQL-based registry that is backed by a database. Details are available [here](./scaling-feast.md#scaling-feast-registry). +> **Note:** A SQL-based registry primarily works with a Python feature server. The Java feature server does not understand this registry type yet. + ### 1.3 Setting up CI/CD to automatically update the registry We recommend typically setting up CI/CD to automatically run `feast plan` and `feast apply` when pull requests are opened / merged. @@ -78,7 +80,7 @@ batch_engine: key: aws-secret-access-key ``` -### 2.2 Scheduled materialization +### 2.2 Scheduled materialization with Airflow > See also [data ingestion](../getting-started/concepts/data-ingestion.md#batch-data-ingestion) for code snippets @@ -91,34 +93,34 @@ However, the amount of work can quickly outgrow the resources of a single machin If you are using Airflow as a scheduler, Feast can be invoked through a [PythonOperator](https://airflow.apache.org/docs/apache-airflow/stable/howto/operator/python.html) after the [Python SDK](https://pypi.org/project/feast/) has been installed into a virtual environment and your feature repo has been synced: ```python -import datetime -from airflow.operators.python_operator import PythonOperator +from airflow.decorators import task from feast import RepoConfig, FeatureStore from feast.infra.online_stores.dynamodb import DynamoDBOnlineStoreConfig from feast.repo_config import RegistryConfig # Define Python callable -def materialize(): +@task() +def materialize(data_interval_start=None, data_interval_end=None): repo_config = RepoConfig( registry=RegistryConfig(path="s3://[YOUR BUCKET]/registry.pb"), project="feast_demo_aws", provider="aws", offline_store="file", - online_store=DynamoDBOnlineStoreConfig(region="us-west-2") + online_store=DynamoDBOnlineStoreConfig(region="us-west-2"), + entity_key_serialization_version=2 ) store = FeatureStore(config=repo_config) # Option 1: materialize just one feature view # store.materialize_incremental(datetime.datetime.now(), feature_views=["my_fv_name"]) # Option 2: materialize all feature views incrementally - store.materialize_incremental(datetime.datetime.now()) - -# Use Airflow PythonOperator -materialize_python = PythonOperator( - task_id='materialize_python', - python_callable=materialize, -) + # store.materialize_incremental(datetime.datetime.now()) + # Option 3: Let Airflow manage materialization state + # Add 1 hr overlap to account for late data + store.materialize(data_interval_start.subtract(hours=1), data_interval_end) ``` +You can see more in an example at [Feast Workshop - Module 1](https://github.com/feast-dev/feast-workshop/blob/main/module_1/README.md#step-7-scaling-up-and-scheduling-materialization). + {% hint style="success" %} Important note: Airflow worker must have read and write permissions to the registry file on GCS / S3 since it pulls configuration and updates materialization history. {% endhint %} @@ -128,6 +130,8 @@ See more details at [data ingestion](../getting-started/concepts/data-ingestion. This supports pushing feature values into Feast to both online or offline stores. +### 2.4 Scheduled batch transformations with Airflow + dbt +Feast does not orchestrate batch transformation DAGs. For this, you can rely on tools like Airflow + dbt. See [Feast Workshop - Module 3](https://github.com/feast-dev/feast-workshop/blob/main/module_3/) for an example and some tips. ## 3. How to use Feast for model training @@ -238,7 +242,7 @@ helm install feast-release feast-charts/feast-feature-server \ --set feature_store_yaml_base64=$(base64 feature_store.yaml) ``` -This will deploy a single service. The service must have read access to the registry file on cloud storage. It will keep a copy of the registry in their memory and periodically refresh it, so expect some delays in update propagation in exchange for better performance. +This will deploy a single service. The service must have read access to the registry file on cloud storage and to the online store (e.g. via [podAnnotations](https://kubernetes-on-aws.readthedocs.io/en/latest/user-guide/iam-roles.html)). It will keep a copy of the registry in their memory and periodically refresh it, so expect some delays in update propagation in exchange for better performance. ## 5. Using environment variables in your yaml configuration @@ -272,7 +276,7 @@ In summary, the overall architecture in production may look like: * Feast SDK is being triggered by CI (eg, Github Actions). It applies the latest changes from the feature repo to the Feast database-backed registry * Data ingestion - * **Batch data**: Airflow manages materialization jobs to ingest batch data from DWH to the online store periodically. When working with large datasets to materialize, we recommend using a batch materialization engine + * **Batch data**: Airflow manages batch transformation jobs + materialization jobs to ingest batch data from DWH to the online store periodically. When working with large datasets to materialize, we recommend using a batch materialization engine * If your offline and online workloads are in Snowflake, the Snowflake materialization engine is likely the best option. * If your offline and online workloads are not using Snowflake, but using Kubernetes is an option, the Bytewax materialization engine is likely the best option. * If none of these engines suite your needs, you may continue using the in-process engine, or write a custom engine (e.g with Spark or Ray). diff --git a/docs/reference/batch-materialization/README.md b/docs/reference/batch-materialization/README.md index 50640bce49c..8511fd81d0b 100644 --- a/docs/reference/batch-materialization/README.md +++ b/docs/reference/batch-materialization/README.md @@ -5,3 +5,7 @@ Please see [Batch Materialization Engine](../../getting-started/architecture-and {% page-ref page="snowflake.md" %} {% page-ref page="bytewax.md" %} + +{% page-ref page="lambda.md" %} + +{% page-ref page="spark.md" %} diff --git a/docs/reference/batch-materialization/lambda.md b/docs/reference/batch-materialization/lambda.md new file mode 100644 index 00000000000..126d07c4103 --- /dev/null +++ b/docs/reference/batch-materialization/lambda.md @@ -0,0 +1,24 @@ +# AWS Lambda (alpha) + +## Description + +The AWS Lambda batch materialization engine is considered alpha status. It relies on the offline store to output feature values to S3 via `to_remote_storage`, and then loads them into the online store. + +See [LambdaMaterializationEngineConfig](https://rtd.feast.dev/en/master/index.html?highlight=LambdaMaterializationEngine#feast.infra.materialization.aws_lambda.lambda_engine.LambdaMaterializationEngineConfig) for configuration options. + +See also [Dockerfile](https://github.com/feast-dev/feast/blob/master/sdk/python/feast/infra/materialization/aws_lambda/Dockerfile) for a Dockerfile that can be used below with `materialization_image`. + +## Example + +{% code title="feature_store.yaml" %} +```yaml +... +offline_store: + type: snowflake.offline +... +batch_engine: + type: lambda + lambda_role: [your iam role] + materialization_image: [image uri of above Docker image] +``` +{% endcode %} diff --git a/docs/reference/batch-materialization/spark.md b/docs/reference/batch-materialization/spark.md new file mode 100644 index 00000000000..27a1388c48e --- /dev/null +++ b/docs/reference/batch-materialization/spark.md @@ -0,0 +1,55 @@ +# Spark (alpha) + +## Description + +The Spark batch materialization engine is considered alpha status. It relies on the offline store to output feature values to S3 via `to_remote_storage`, and then loads them into the online store. + +See [SparkMaterializationEngine](https://rtd.feast.dev/en/master/index.html?highlight=SparkMaterializationEngine#feast.infra.materialization.spark.spark_materialization_engine.SparkMaterializationEngineConfig) for configuration options. + +## Example + +{% code title="feature_store.yaml" %} +```yaml +... +offline_store: + type: snowflake.offline +... +batch_engine: + type: spark.engine + partitions: [optional num partitions to use to write to online store] +``` +{% endcode %} + +## Example in Python + +{% code title="feature_store.py" %} +```python +from feast import FeatureStore, RepoConfig +from feast.repo_config import RegistryConfig +from feast.infra.online_stores.dynamodb import DynamoDBOnlineStoreConfig +from feast.infra.offline_stores.contrib.spark_offline_store.spark import SparkOfflineStoreConfig + +repo_config = RepoConfig( + registry="s3://[YOUR_BUCKET]/feast-registry.db", + project="feast_repo", + provider="aws", + offline_store=SparkOfflineStoreConfig( + spark_conf={ + "spark.ui.enabled": "false", + "spark.eventLog.enabled": "false", + "spark.sql.catalogImplementation": "hive", + "spark.sql.parser.quotedRegexColumnNames": "true", + "spark.sql.session.timeZone": "UTC" + } + ), + batch_engine={ + "type": "spark.engine", + "partitions": 10 + }, + online_store=DynamoDBOnlineStoreConfig(region="us-west-1"), + entity_key_serialization_version=2 +) + +store = FeatureStore(config=repo_config) +``` +{% endcode %} \ No newline at end of file diff --git a/docs/reference/feature-servers/python-feature-server.md b/docs/reference/feature-servers/python-feature-server.md index d18bdc4f42e..c189f97ae03 100644 --- a/docs/reference/feature-servers/python-feature-server.md +++ b/docs/reference/feature-servers/python-feature-server.md @@ -157,20 +157,22 @@ curl -X POST \ The Python feature server also exposes an endpoint for [push sources](../../data-sources/push.md). This endpoint allows you to push data to the online and/or offline store. -The request definition for pushmode is a string parameter `to` where the options are: \["online", "offline", "online\_and\_offline"]. Note that timestamps need to be strings. +The request definition for `PushMode` is a string parameter `to` where the options are: \[`"online"`, `"offline"`, `"online_and_offline"`]. + +**Note:** timestamps need to be strings, and might need to be timezone aware (matching the schema of the offline store) ``` curl -X POST "http://localhost:6566/push" -d '{ - "push_source_name": "driver_hourly_stats_push_source", + "push_source_name": "driver_stats_push_source", "df": { "driver_id": [1001], - "event_timestamp": ["2022-05-13 10:59:42"], + "event_timestamp": ["2022-05-13 10:59:42+00:00"], "created": ["2022-05-13 10:59:42"], "conv_rate": [1.0], "acc_rate": [1.0], "avg_daily_trips": [1000] }, - "to": "online_and_offline", + "to": "online_and_offline" }' | jq ``` @@ -179,7 +181,6 @@ or equivalently from Python: ```python import json import requests -import pandas as pd from datetime import datetime event_dict = { diff --git a/docs/reference/online-stores/cassandra.md b/docs/reference/online-stores/cassandra.md index e012ad250ae..30514305b6a 100644 --- a/docs/reference/online-stores/cassandra.md +++ b/docs/reference/online-stores/cassandra.md @@ -32,6 +32,8 @@ online_store: load_balancing: # optional local_dc: 'datacenter1' # optional load_balancing_policy: 'TokenAwarePolicy(DCAwareRoundRobinPolicy)' # optional + read_concurrency: 100 # optional + write_concurrency: 100 # optional ``` {% endcode %} @@ -52,7 +54,8 @@ online_store: load_balancing: # optional local_dc: 'eu-central-1' # optional load_balancing_policy: 'TokenAwarePolicy(DCAwareRoundRobinPolicy)' # optional - + read_concurrency: 100 # optional + write_concurrency: 100 # optional ``` {% endcode %} diff --git a/docs/reference/online-stores/redis.md b/docs/reference/online-stores/redis.md index 2078ee16b95..c08cef2a3e1 100644 --- a/docs/reference/online-stores/redis.md +++ b/docs/reference/online-stores/redis.md @@ -45,6 +45,23 @@ online_store: ``` {% endcode %} +Additionally, the redis online store also supports automatically deleting data via a TTL mechanism. +The TTL is applied at the entity level, so feature values from any associated feature views for an entity are removed together. +This TTL can be set in the `feature_store.yaml`, using the `key_ttl_seconds` field in the online store. For example: + +{% code title="feature_store.yaml" %} +```yaml +project: my_feature_repo +registry: data/registry.db +provider: local +online_store: + type: redis + key_ttl_seconds: 604800 + connection_string: "localhost:6379" +``` +{% endcode %} + + The full set of configuration options is available in [RedisOnlineStoreConfig](https://rtd.feast.dev/en/latest/#feast.infra.online_stores.redis.RedisOnlineStoreConfig). ## Functionality Matrix diff --git a/docs/roadmap.md b/docs/roadmap.md index cea646a8f5f..e75e58849bf 100644 --- a/docs/roadmap.md +++ b/docs/roadmap.md @@ -25,7 +25,7 @@ The list below contains the functionality that contributors are planning to deve * [x] [Trino (contrib plugin)](https://github.com/Shopify/feast-trino) * [x] [Spark (contrib plugin)](https://docs.feast.dev/reference/offline-stores/spark) * [x] [In-memory / Pandas](https://docs.feast.dev/reference/offline-stores/file) - * [x] [Custom offline store support](https://docs.feast.dev/how-to-guides/adding-a-new-offline-store) + * [x] [Custom offline store support](https://docs.feast.dev/how-to-guides/customizing-feast/adding-a-new-offline-store) * **Online Stores** * [x] [Snowflake](https://docs.feast.dev/reference/online-stores/snowflake) * [x] [DynamoDB](https://docs.feast.dev/reference/online-stores/dynamodb) @@ -36,18 +36,18 @@ The list below contains the functionality that contributors are planning to deve * [x] [Azure Cache for Redis (community plugin)](https://github.com/Azure/feast-azure) * [x] [Postgres (contrib plugin)](https://docs.feast.dev/reference/online-stores/postgres) * [x] [Cassandra / AstraDB (contrib plugin)](https://docs.feast.dev/reference/online-stores/cassandra) - * [x] [Custom online store support](https://docs.feast.dev/how-to-guides/adding-support-for-a-new-online-store) + * [x] [Custom online store support](https://docs.feast.dev/how-to-guides/customizing-feast/adding-support-for-a-new-online-store) * **Feature Engineering** * [x] On-demand Transformations (Alpha release. See [RFC](https://docs.google.com/document/d/1lgfIw0Drc65LpaxbUu49RCeJgMew547meSJttnUqz7c/edit#)) * [x] Streaming Transformations (Alpha release. See [RFC](https://docs.google.com/document/d/1UzEyETHUaGpn0ap4G82DHluiCj7zEbrQLkJJkKSv4e8/edit)) * [ ] Batch transformation (In progress. See [RFC](https://docs.google.com/document/d/1964OkzuBljifDvkV-0fakp2uaijnVzdwWNGdz7Vz50A/edit)) * **Streaming** - * [x] [Custom streaming ingestion job support](https://docs.feast.dev/how-to-guides/creating-a-custom-provider) + * [x] [Custom streaming ingestion job support](https://docs.feast.dev/how-to-guides/customizing-feast/creating-a-custom-provider) * [x] [Push based streaming data ingestion to online store](https://docs.feast.dev/reference/data-sources/push) * [x] [Push based streaming data ingestion to offline store](https://docs.feast.dev/reference/data-sources/push) * **Deployments** * [x] AWS Lambda (Alpha release. See [RFC](https://docs.google.com/document/d/1eZWKWzfBif66LDN32IajpaG-j82LSHCCOzY6R7Ax7MI/edit)) - * [x] Kubernetes (See [guide](https://docs.feast.dev/how-to-guides/running-feast-in-production#4.3.-java-based-feature-server-deployed-on-kubernetes)) + * [x] Kubernetes (See [guide](https://docs.feast.dev/how-to-guides/running-feast-in-production)) * **Feature Serving** * [x] Python Client * [x] [Python feature server](https://docs.feast.dev/reference/feature-servers/python-feature-server) diff --git a/infra/charts/feast-feature-server/Chart.yaml b/infra/charts/feast-feature-server/Chart.yaml index 252dceb9f6f..55041c2b8dd 100644 --- a/infra/charts/feast-feature-server/Chart.yaml +++ b/infra/charts/feast-feature-server/Chart.yaml @@ -2,7 +2,7 @@ apiVersion: v2 name: feast-feature-server description: Feast Feature Server in Go or Python type: application -version: 0.26.0 +version: 0.27.0 keywords: - machine learning - big data diff --git a/infra/charts/feast-feature-server/README.md b/infra/charts/feast-feature-server/README.md index 0ae45328598..f023021f963 100644 --- a/infra/charts/feast-feature-server/README.md +++ b/infra/charts/feast-feature-server/README.md @@ -1,6 +1,6 @@ # Feast Python / Go Feature Server Helm Charts -Current chart version is `0.26.0` +Current chart version is `0.27.0` ## Installation @@ -30,7 +30,7 @@ See [here](https://github.com/feast-dev/feast/tree/master/examples/python-helm-d | fullnameOverride | string | `""` | | | image.pullPolicy | string | `"IfNotPresent"` | | | image.repository | string | `"feastdev/feature-server"` | Docker image for Feature Server repository | -| image.tag | string | `"0.26.0"` | The Docker image tag (can be overwritten if custom feature server deps are needed for on demand transforms) | +| image.tag | string | `"0.27.0"` | The Docker image tag (can be overwritten if custom feature server deps are needed for on demand transforms) | | imagePullSecrets | list | `[]` | | | livenessProbe.initialDelaySeconds | int | `30` | | | livenessProbe.periodSeconds | int | `30` | | diff --git a/infra/charts/feast-feature-server/values.yaml b/infra/charts/feast-feature-server/values.yaml index d897346efe4..c66c45e36dc 100644 --- a/infra/charts/feast-feature-server/values.yaml +++ b/infra/charts/feast-feature-server/values.yaml @@ -9,7 +9,7 @@ image: repository: feastdev/feature-server pullPolicy: IfNotPresent # image.tag -- The Docker image tag (can be overwritten if custom feature server deps are needed for on demand transforms) - tag: 0.26.0 + tag: 0.27.0 imagePullSecrets: [] nameOverride: "" diff --git a/infra/charts/feast-python-server/.helmignore b/infra/charts/feast-python-server/.helmignore deleted file mode 100644 index 0e8a0eb36f4..00000000000 --- a/infra/charts/feast-python-server/.helmignore +++ /dev/null @@ -1,23 +0,0 @@ -# Patterns to ignore when building packages. -# This supports shell glob matching, relative path matching, and -# negation (prefixed with !). Only one pattern per line. -.DS_Store -# Common VCS dirs -.git/ -.gitignore -.bzr/ -.bzrignore -.hg/ -.hgignore -.svn/ -# Common backup files -*.swp -*.bak -*.tmp -*.orig -*~ -# Various IDEs -.project -.idea/ -*.tmproj -.vscode/ diff --git a/infra/charts/feast-python-server/Chart.yaml b/infra/charts/feast-python-server/Chart.yaml deleted file mode 100644 index 0c7690deb27..00000000000 --- a/infra/charts/feast-python-server/Chart.yaml +++ /dev/null @@ -1,12 +0,0 @@ -apiVersion: v2 -name: feast-python-server -description: Feast Feature Server in Python -type: application -version: 0.26.0 -keywords: - - machine learning - - big data - - mlops -home: https://feast.dev/ -sources: - - https://github.com/feast-dev/feast diff --git a/infra/charts/feast-python-server/README.md b/infra/charts/feast-python-server/README.md deleted file mode 100644 index ab2de1d3a83..00000000000 --- a/infra/charts/feast-python-server/README.md +++ /dev/null @@ -1,50 +0,0 @@ -# Feast Python Feature Server Helm Charts (deprecated) - -> Note: this helm chart is deprecated in favor of [feast-feature-server](../feast-feature-server/README.md) - -Current chart version is `0.26.0` - -## Installation -Docker repository and tag are required. Helm install example: -``` -helm install feast-python-server . --set image.repository=REPO --set image.tag=TAG -``` - -Deployment assumes that `feature_store.yaml` exists on docker image. Example docker image: -``` -FROM python:3.8 - -RUN apt update && \ - apt install -y jq - -RUN pip install pip --upgrade - -RUN pip install feast - -COPY feature_store.yaml /feature_store.yaml -``` - -## Values - -| Key | Type | Default | Description | -|-----|------|---------|-------------| -| affinity | object | `{}` | | -| fullnameOverride | string | `""` | | -| image.pullPolicy | string | `"IfNotPresent"` | | -| image.repository | string | `""` | [required] The repository for the Docker image | -| image.tag | string | `""` | [required] The Docker image tag | -| imagePullSecrets | list | `[]` | | -| livenessProbe.initialDelaySeconds | int | `30` | | -| livenessProbe.periodSeconds | int | `30` | | -| nameOverride | string | `""` | | -| nodeSelector | object | `{}` | | -| podAnnotations | object | `{}` | | -| podSecurityContext | object | `{}` | | -| readinessProbe.initialDelaySeconds | int | `20` | | -| readinessProbe.periodSeconds | int | `10` | | -| replicaCount | int | `1` | | -| resources | object | `{}` | | -| securityContext | object | `{}` | | -| service.port | int | `80` | | -| service.type | string | `"ClusterIP"` | | -| tolerations | list | `[]` | | \ No newline at end of file diff --git a/infra/charts/feast-python-server/README.md.gotmpl b/infra/charts/feast-python-server/README.md.gotmpl deleted file mode 100644 index cb264c0066a..00000000000 --- a/infra/charts/feast-python-server/README.md.gotmpl +++ /dev/null @@ -1,29 +0,0 @@ -# Feast Python Feature Server Helm Charts (deprecated) - -> Note: this helm chart is deprecated in favor of [feast-feature-server](../feast-feature-server/README.md) - -Current chart version is `{{ template "chart.version" . }}` - -## Installation -Docker repository and tag are required. Helm install example: -``` -helm install feast-python-server . --set image.repository=REPO --set image.tag=TAG -``` - -Deployment assumes that `feature_store.yaml` exists on docker image. Example docker image: -``` -FROM python:3.8 - -RUN apt update && \ - apt install -y jq - -RUN pip install pip --upgrade - -RUN pip install feast - -COPY feature_store.yaml /feature_store.yaml -``` - -{{ template "chart.requirementsSection" . }} - -{{ template "chart.valuesSection" . }} \ No newline at end of file diff --git a/infra/charts/feast-python-server/templates/_helpers.tpl b/infra/charts/feast-python-server/templates/_helpers.tpl deleted file mode 100644 index b64e10536d1..00000000000 --- a/infra/charts/feast-python-server/templates/_helpers.tpl +++ /dev/null @@ -1,52 +0,0 @@ -{{/* vim: set filetype=mustache: */}} -{{/* -Expand the name of the chart. -*/}} -{{- define "feast-python-server.name" -}} -{{- default .Chart.Name .Values.nameOverride | trunc 63 | trimSuffix "-" }} -{{- end }} - -{{/* -Create a default fully qualified app name. -We truncate at 63 chars because some Kubernetes name fields are limited to this (by the DNS naming spec). -If release name contains chart name it will be used as a full name. -*/}} -{{- define "feast-python-server.fullname" -}} -{{- if .Values.fullnameOverride }} -{{- .Values.fullnameOverride | trunc 63 | trimSuffix "-" }} -{{- else }} -{{- $name := default .Chart.Name .Values.nameOverride }} -{{- if contains $name .Release.Name }} -{{- .Release.Name | trunc 63 | trimSuffix "-" }} -{{- else }} -{{- printf "%s-%s" .Release.Name $name | trunc 63 | trimSuffix "-" }} -{{- end }} -{{- end }} -{{- end }} - -{{/* -Create chart name and version as used by the chart label. -*/}} -{{- define "feast-python-server.chart" -}} -{{- printf "%s-%s" .Chart.Name .Chart.Version | replace "+" "_" | trunc 63 | trimSuffix "-" }} -{{- end }} - -{{/* -Common labels -*/}} -{{- define "feast-python-server.labels" -}} -helm.sh/chart: {{ include "feast-python-server.chart" . }} -{{ include "feast-python-server.selectorLabels" . }} -{{- if .Chart.AppVersion }} -app.kubernetes.io/version: {{ .Chart.AppVersion | quote }} -{{- end }} -app.kubernetes.io/managed-by: {{ .Release.Service }} -{{- end }} - -{{/* -Selector labels -*/}} -{{- define "feast-python-server.selectorLabels" -}} -app.kubernetes.io/name: {{ include "feast-python-server.name" . }} -app.kubernetes.io/instance: {{ .Release.Name }} -{{- end }} diff --git a/infra/charts/feast-python-server/templates/deployment.yaml b/infra/charts/feast-python-server/templates/deployment.yaml deleted file mode 100644 index 56439be658c..00000000000 --- a/infra/charts/feast-python-server/templates/deployment.yaml +++ /dev/null @@ -1,61 +0,0 @@ -apiVersion: apps/v1 -kind: Deployment -metadata: - name: {{ include "feast-python-server.fullname" . }} - labels: - {{- include "feast-python-server.labels" . | nindent 4 }} -spec: - replicas: {{ .Values.replicaCount }} - selector: - matchLabels: - {{- include "feast-python-server.selectorLabels" . | nindent 6 }} - template: - metadata: - {{- with .Values.podAnnotations }} - annotations: - {{- toYaml . | nindent 8 }} - {{- end }} - labels: - {{- include "feast-python-server.selectorLabels" . | nindent 8 }} - spec: - {{- with .Values.imagePullSecrets }} - imagePullSecrets: - {{- toYaml . | nindent 8 }} - {{- end }} - securityContext: - {{- toYaml .Values.podSecurityContext | nindent 8 }} - containers: - - name: {{ .Chart.Name }} - securityContext: - {{- toYaml .Values.securityContext | nindent 12 }} - image: "{{ .Values.image.repository }}:{{ .Values.image.tag }}" - imagePullPolicy: {{ .Values.image.pullPolicy }} - command: ["feast", "serve", "-h", "0.0.0.0"] - ports: - - name: http - containerPort: 6566 - protocol: TCP - livenessProbe: - tcpSocket: - port: http - initialDelaySeconds: {{ .Values.livenessProbe.initialDelaySeconds }} - periodSeconds: {{ .Values.livenessProbe.periodSeconds }} - readinessProbe: - tcpSocket: - port: http - initialDelaySeconds: {{ .Values.readinessProbe.initialDelaySeconds }} - periodSeconds: {{ .Values.readinessProbe.periodSeconds }} - resources: - {{- toYaml .Values.resources | nindent 12 }} - {{- with .Values.nodeSelector }} - nodeSelector: - {{- toYaml . | nindent 8 }} - {{- end }} - {{- with .Values.affinity }} - affinity: - {{- toYaml . | nindent 8 }} - {{- end }} - {{- with .Values.tolerations }} - tolerations: - {{- toYaml . | nindent 8 }} - {{- end }} diff --git a/infra/charts/feast-python-server/templates/service.yaml b/infra/charts/feast-python-server/templates/service.yaml deleted file mode 100644 index 86bf38a9a45..00000000000 --- a/infra/charts/feast-python-server/templates/service.yaml +++ /dev/null @@ -1,15 +0,0 @@ -apiVersion: v1 -kind: Service -metadata: - name: {{ include "feast-python-server.name" . }} - labels: - {{- include "feast-python-server.labels" . | nindent 4 }} -spec: - type: {{ .Values.service.type }} - ports: - - port: {{ .Values.service.port }} - targetPort: http - protocol: TCP - name: http - selector: - {{- include "feast-python-server.selectorLabels" . | nindent 4 }} diff --git a/infra/charts/feast-python-server/values.yaml b/infra/charts/feast-python-server/values.yaml deleted file mode 100644 index 6d0ab9c0ae4..00000000000 --- a/infra/charts/feast-python-server/values.yaml +++ /dev/null @@ -1,59 +0,0 @@ -# Default values for feast. -# This is a YAML-formatted file. -# Declare variables to be passed into your templates. - -replicaCount: 1 - -image: - # image.repository -- [required] The repository for the Docker image - repository: "" - pullPolicy: IfNotPresent - # image.tag -- [required] The Docker image tag - tag: "" - -imagePullSecrets: [] -nameOverride: "" -fullnameOverride: "" - -podAnnotations: {} - -podSecurityContext: {} - # fsGroup: 2000 - -securityContext: {} - # capabilities: - # drop: - # - ALL - # readOnlyRootFilesystem: true - # runAsNonRoot: true - # runAsUser: 1000 - -service: - type: ClusterIP - port: 80 - -resources: {} - # We usually recommend not to specify default resources and to leave this as a conscious - # choice for the user. This also increases chances charts run on environments with little - # resources, such as Minikube. If you do want to specify resources, uncomment the following - # lines, adjust them as necessary, and remove the curly braces after 'resources:'. - # limits: - # cpu: 100m - # memory: 128Mi - # requests: - # cpu: 100m - # memory: 128Mi - -nodeSelector: {} - -tolerations: [] - -affinity: {} - -livenessProbe: - initialDelaySeconds: 30 - periodSeconds: 30 - -readinessProbe: - initialDelaySeconds: 20 - periodSeconds: 10 diff --git a/infra/charts/feast/Chart.yaml b/infra/charts/feast/Chart.yaml index f0d6774a3cb..2586871119d 100644 --- a/infra/charts/feast/Chart.yaml +++ b/infra/charts/feast/Chart.yaml @@ -1,7 +1,7 @@ apiVersion: v1 description: Feature store for machine learning name: feast -version: 0.26.0 +version: 0.27.0 keywords: - machine learning - big data diff --git a/infra/charts/feast/README.md b/infra/charts/feast/README.md index b0dde94610b..9519729f541 100644 --- a/infra/charts/feast/README.md +++ b/infra/charts/feast/README.md @@ -8,7 +8,7 @@ This repo contains Helm charts for Feast Java components that are being installe ## Chart: Feast -Feature store for machine learning Current chart version is `0.26.0` +Feature store for machine learning Current chart version is `0.27.0` ## Installation @@ -65,8 +65,8 @@ See [here](https://github.com/feast-dev/feast/tree/master/examples/java-demo) fo | Repository | Name | Version | |------------|------|---------| | https://charts.helm.sh/stable | redis | 10.5.6 | -| https://feast-helm-charts.storage.googleapis.com | feature-server(feature-server) | 0.26.0 | -| https://feast-helm-charts.storage.googleapis.com | transformation-service(transformation-service) | 0.26.0 | +| https://feast-helm-charts.storage.googleapis.com | feature-server(feature-server) | 0.27.0 | +| https://feast-helm-charts.storage.googleapis.com | transformation-service(transformation-service) | 0.27.0 | ## Values diff --git a/infra/charts/feast/charts/feature-server/Chart.yaml b/infra/charts/feast/charts/feature-server/Chart.yaml index 6e577e1fb83..4874dca14ba 100644 --- a/infra/charts/feast/charts/feature-server/Chart.yaml +++ b/infra/charts/feast/charts/feature-server/Chart.yaml @@ -1,8 +1,8 @@ apiVersion: v1 description: "Feast Feature Server: Online feature serving service for Feast" name: feature-server -version: 0.26.0 -appVersion: v0.26.0 +version: 0.27.0 +appVersion: v0.27.0 keywords: - machine learning - big data diff --git a/infra/charts/feast/charts/feature-server/README.md b/infra/charts/feast/charts/feature-server/README.md index f13a6488c39..92dd950a22a 100644 --- a/infra/charts/feast/charts/feature-server/README.md +++ b/infra/charts/feast/charts/feature-server/README.md @@ -1,6 +1,6 @@ # feature-server -![Version: 0.26.0](https://img.shields.io/badge/Version-0.26.0-informational?style=flat-square) ![AppVersion: v0.26.0](https://img.shields.io/badge/AppVersion-v0.26.0-informational?style=flat-square) +![Version: 0.27.0](https://img.shields.io/badge/Version-0.27.0-informational?style=flat-square) ![AppVersion: v0.27.0](https://img.shields.io/badge/AppVersion-v0.27.0-informational?style=flat-square) Feast Feature Server: Online feature serving service for Feast @@ -17,7 +17,7 @@ Feast Feature Server: Online feature serving service for Feast | envOverrides | object | `{}` | Extra environment variables to set | | image.pullPolicy | string | `"IfNotPresent"` | Image pull policy | | image.repository | string | `"feastdev/feature-server-java"` | Docker image for Feature Server repository | -| image.tag | string | `"0.26.0"` | Image tag | +| image.tag | string | `"0.27.0"` | Image tag | | ingress.grpc.annotations | object | `{}` | Extra annotations for the ingress | | ingress.grpc.auth.enabled | bool | `false` | Flag to enable auth | | ingress.grpc.class | string | `"nginx"` | Which ingress controller to use | diff --git a/infra/charts/feast/charts/feature-server/values.yaml b/infra/charts/feast/charts/feature-server/values.yaml index 66795aeba71..529ca53d09a 100644 --- a/infra/charts/feast/charts/feature-server/values.yaml +++ b/infra/charts/feast/charts/feature-server/values.yaml @@ -5,7 +5,7 @@ image: # image.repository -- Docker image for Feature Server repository repository: feastdev/feature-server-java # image.tag -- Image tag - tag: 0.26.0 + tag: 0.27.0 # image.pullPolicy -- Image pull policy pullPolicy: IfNotPresent diff --git a/infra/charts/feast/charts/transformation-service/Chart.yaml b/infra/charts/feast/charts/transformation-service/Chart.yaml index 20321e687a3..cf12aa7a79a 100644 --- a/infra/charts/feast/charts/transformation-service/Chart.yaml +++ b/infra/charts/feast/charts/transformation-service/Chart.yaml @@ -1,8 +1,8 @@ apiVersion: v1 description: "Transformation service: to compute on-demand features" name: transformation-service -version: 0.26.0 -appVersion: v0.26.0 +version: 0.27.0 +appVersion: v0.27.0 keywords: - machine learning - big data diff --git a/infra/charts/feast/charts/transformation-service/README.md b/infra/charts/feast/charts/transformation-service/README.md index 8b44c218df9..b137e9b544b 100644 --- a/infra/charts/feast/charts/transformation-service/README.md +++ b/infra/charts/feast/charts/transformation-service/README.md @@ -1,6 +1,6 @@ # transformation-service -![Version: 0.26.0](https://img.shields.io/badge/Version-0.26.0-informational?style=flat-square) ![AppVersion: v0.26.0](https://img.shields.io/badge/AppVersion-v0.26.0-informational?style=flat-square) +![Version: 0.27.0](https://img.shields.io/badge/Version-0.27.0-informational?style=flat-square) ![AppVersion: v0.27.0](https://img.shields.io/badge/AppVersion-v0.27.0-informational?style=flat-square) Transformation service: to compute on-demand features @@ -13,7 +13,7 @@ Transformation service: to compute on-demand features | envOverrides | object | `{}` | Extra environment variables to set | | image.pullPolicy | string | `"IfNotPresent"` | Image pull policy | | image.repository | string | `"feastdev/feature-transformation-server"` | Docker image for Transformation Server repository | -| image.tag | string | `"0.26.0"` | Image tag | +| image.tag | string | `"0.27.0"` | Image tag | | nodeSelector | object | `{}` | Node labels for pod assignment | | podLabels | object | `{}` | Labels to be added to Feast Serving pods | | replicaCount | int | `1` | Number of pods that will be created | diff --git a/infra/charts/feast/charts/transformation-service/values.yaml b/infra/charts/feast/charts/transformation-service/values.yaml index ecaa16e614f..eb5876a18b3 100644 --- a/infra/charts/feast/charts/transformation-service/values.yaml +++ b/infra/charts/feast/charts/transformation-service/values.yaml @@ -5,7 +5,7 @@ image: # image.repository -- Docker image for Transformation Server repository repository: feastdev/feature-transformation-server # image.tag -- Image tag - tag: 0.26.0 + tag: 0.27.0 # image.pullPolicy -- Image pull policy pullPolicy: IfNotPresent diff --git a/infra/charts/feast/requirements.yaml b/infra/charts/feast/requirements.yaml index f9f23092f5d..94d34d0f335 100644 --- a/infra/charts/feast/requirements.yaml +++ b/infra/charts/feast/requirements.yaml @@ -1,12 +1,12 @@ dependencies: - name: feature-server alias: feature-server - version: 0.26.0 + version: 0.27.0 condition: feature-server.enabled repository: https://feast-helm-charts.storage.googleapis.com - name: transformation-service alias: transformation-service - version: 0.26.0 + version: 0.27.0 condition: transformation-service.enabled repository: https://feast-helm-charts.storage.googleapis.com - name: redis diff --git a/infra/scripts/helm/push-helm-charts.sh b/infra/scripts/helm/push-helm-charts.sh index 1c32ee985b8..338751c28b8 100755 --- a/infra/scripts/helm/push-helm-charts.sh +++ b/infra/scripts/helm/push-helm-charts.sh @@ -16,10 +16,8 @@ helm repo add feast-helm-chart-repo $bucket cd infra/charts helm package feast -helm package feast-python-server helm package feast-feature-server helm gcs push --public feast-${1}.tgz feast-helm-chart-repo --force -helm gcs push --public feast-python-server-${1}.tgz feast-helm-chart-repo --force helm gcs push --public feast-feature-server-${1}.tgz feast-helm-chart-repo --force rm -f ./*.tgz \ No newline at end of file diff --git a/infra/scripts/helm/validate-helm-chart-versions.sh b/infra/scripts/helm/validate-helm-chart-versions.sh index cd8317222bd..8a088a006d8 100755 --- a/infra/scripts/helm/validate-helm-chart-versions.sh +++ b/infra/scripts/helm/validate-helm-chart-versions.sh @@ -3,7 +3,7 @@ set -e # Amount of file locations that need to be bumped in unison when versions increment -UNIQUE_VERSIONS_COUNT=22 # Change in release 0.24.0 +UNIQUE_VERSIONS_COUNT=20 # Change in release 0.27.0 if [ $# -ne 1 ]; then echo "Please provide a single semver version (without a \"v\" prefix) to test the repository against, e.g 0.99.0" diff --git a/infra/scripts/release/files_to_bump.txt b/infra/scripts/release/files_to_bump.txt index d7588185ded..61a70ac6b3c 100644 --- a/infra/scripts/release/files_to_bump.txt +++ b/infra/scripts/release/files_to_bump.txt @@ -7,8 +7,6 @@ infra/charts/feast/charts/feature-server/Chart.yaml 4 5 infra/charts/feast/charts/feature-server/README.md 3 20 infra/charts/feast/charts/feature-server/values.yaml 8 infra/charts/feast/README.md 11 68 69 -infra/charts/feast-python-server/Chart.yaml 5 -infra/charts/feast-python-server/README.md 5 infra/charts/feast-feature-server/Chart.yaml 5 infra/charts/feast-feature-server/README.md 3 infra/charts/feast-feature-server/values.yaml 12 diff --git a/infra/templates/README.md.jinja2 b/infra/templates/README.md.jinja2 index e59a364d819..e5213ae9f17 100644 --- a/infra/templates/README.md.jinja2 +++ b/infra/templates/README.md.jinja2 @@ -25,10 +25,10 @@ Feast allows ML platform teams to: * **Avoid data leakage** by generating point-in-time correct feature sets so data scientists can focus on feature engineering rather than debugging error-prone dataset joining logic. This ensure that future feature values do not leak to models during training. * **Decouple ML from data infrastructure** by providing a single data access layer that abstracts feature storage from feature retrieval, ensuring models remain portable as you move from training models to serving models, from batch models to realtime models, and from one data infra system to another. -Please see our [documentation](https://docs.feast.dev/) for more information about the project. +Please see our [documentation](https://docs.feast.dev/) for more information about the project, or sign up for an [email newsletter](https://feast.dev/). ## 📐 Architecture -![](docs/assets/feast-marchitecture.png) +![](docs/assets/feast_marchitecture.png) The above architecture is the minimal Feast deployment. Want to run the full Feast on Snowflake/GCP/AWS? Click [here](https://docs.feast.dev/how-to-guides/feast-snowflake-gcp-aws). diff --git a/java/pom.xml b/java/pom.xml index 3b9910a7cfa..5f1c8f5df90 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -35,7 +35,7 @@ - 0.26.0 + 0.27.0 https://github.com/feast-dev/feast UTF-8 diff --git a/sdk/python/docs/index.rst b/sdk/python/docs/index.rst index ca96782db8a..4cedffb1fc0 100644 --- a/sdk/python/docs/index.rst +++ b/sdk/python/docs/index.rst @@ -448,3 +448,15 @@ Snowflake Engine .. autoclass:: feast.infra.materialization.aws_lambda.lambda_engine.LambdaMaterializationJob :members: + +(Alpha) Spark Engine +--------------------------- + +.. autoclass:: feast.infra.materialization.contrib.spark.spark_materialization_engine.SparkMaterializationEngine + :members: + +.. autoclass:: feast.infra.materialization.contrib.spark.spark_materialization_engine.SparkMaterializationEngineConfig + :members: + +.. autoclass:: feast.infra.materialization.contrib.spark.spark_materialization_engine.SparkMaterializationJob + :members: \ No newline at end of file diff --git a/sdk/python/docs/source/feast.infra.online_stores.rst b/sdk/python/docs/source/feast.infra.online_stores.rst index 65758c409c0..59ac1868f58 100644 --- a/sdk/python/docs/source/feast.infra.online_stores.rst +++ b/sdk/python/docs/source/feast.infra.online_stores.rst @@ -12,6 +12,14 @@ Subpackages Submodules ---------- +feast.infra.online\_stores.bigtable module +------------------------------------------ + +.. automodule:: feast.infra.online_stores.bigtable + :members: + :undoc-members: + :show-inheritance: + feast.infra.online\_stores.datastore module ------------------------------------------- diff --git a/sdk/python/docs/source/index.rst b/sdk/python/docs/source/index.rst index ca96782db8a..4cedffb1fc0 100644 --- a/sdk/python/docs/source/index.rst +++ b/sdk/python/docs/source/index.rst @@ -448,3 +448,15 @@ Snowflake Engine .. autoclass:: feast.infra.materialization.aws_lambda.lambda_engine.LambdaMaterializationJob :members: + +(Alpha) Spark Engine +--------------------------- + +.. autoclass:: feast.infra.materialization.contrib.spark.spark_materialization_engine.SparkMaterializationEngine + :members: + +.. autoclass:: feast.infra.materialization.contrib.spark.spark_materialization_engine.SparkMaterializationEngineConfig + :members: + +.. autoclass:: feast.infra.materialization.contrib.spark.spark_materialization_engine.SparkMaterializationJob + :members: \ No newline at end of file diff --git a/sdk/python/feast/data_source.py b/sdk/python/feast/data_source.py index 54a68ed0480..05df7a90976 100644 --- a/sdk/python/feast/data_source.py +++ b/sdk/python/feast/data_source.py @@ -182,8 +182,6 @@ class DataSource(ABC): tags (optional): A dictionary of key-value pairs to store arbitrary metadata. owner (optional): The owner of the data source, typically the email of the primary maintainer. - timestamp_field (optional): Event timestamp field used for point in time - joins of feature values. date_partition_column (optional): Timestamp column used for partitioning. Not supported by all offline stores. """ diff --git a/sdk/python/feast/errors.py b/sdk/python/feast/errors.py index 15ba86781df..042a3622a98 100644 --- a/sdk/python/feast/errors.py +++ b/sdk/python/feast/errors.py @@ -56,6 +56,14 @@ def __init__(self, name, project=None): super().__init__(f"Feature view {name} does not exist") +class InvalidSparkSessionException(Exception): + def __init__(self, spark_arg): + super().__init__( + f" Need Spark Session to convert results to spark data frame\ + recieved {type(spark_arg)} instead. " + ) + + class OnDemandFeatureViewNotFoundException(FeastObjectNotFoundException): def __init__(self, name, project=None): if project: diff --git a/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_dataflow.py b/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_dataflow.py index 1fad2c909fa..bf5229303ab 100644 --- a/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_dataflow.py +++ b/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_dataflow.py @@ -3,9 +3,10 @@ import pyarrow as pa import pyarrow.parquet as pq import s3fs -from bytewax import Dataflow, cluster_main # type: ignore -from bytewax.inputs import AdvanceTo, Emit, ManualInputConfig, distribute -from bytewax.parse import proc_env +from bytewax.dataflow import Dataflow # type: ignore +from bytewax.execution import cluster_main +from bytewax.inputs import ManualInputConfig, distribute +from bytewax.outputs import ManualOutputConfig from tqdm import tqdm from feast import FeatureStore, FeatureView, RepoConfig @@ -37,20 +38,15 @@ def process_path(self, path): return batches - def input_builder(self, worker_index, worker_count, resume_epoch): + def input_builder(self, worker_index, worker_count, _state): worker_paths = distribute(self.paths, worker_index, worker_count) - epoch = 0 for path in worker_paths: - yield AdvanceTo(epoch) - yield Emit(path) - epoch += 1 + yield None, path return def output_builder(self, worker_index, worker_count): - def output_fn(epoch_batch): - _, batch = epoch_batch - + def output_fn(batch): table = pa.Table.from_batches([batch]) if self.feature_view.batch_source.field_mapping is not None: @@ -79,11 +75,7 @@ def output_fn(epoch_batch): def _run_dataflow(self): flow = Dataflow() + flow.input("inp", ManualInputConfig(self.input_builder)) flow.flat_map(self.process_path) - flow.capture() - cluster_main( - flow, - ManualInputConfig(self.input_builder), - self.output_builder, - **proc_env(), - ) + flow.capture(ManualOutputConfig(self.output_builder)) + cluster_main(flow, [], 0) diff --git a/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_engine.py b/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_engine.py index 0477722eb16..9a456376bf4 100644 --- a/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_engine.py +++ b/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_engine.py @@ -23,7 +23,7 @@ from feast.infra.registry.base_registry import BaseRegistry from feast.repo_config import FeastConfigBaseModel from feast.stream_feature_view import StreamFeatureView -from feast.utils import _get_column_names +from feast.utils import _get_column_names, get_default_yaml_file_path from .bytewax_materialization_job import BytewaxMaterializationJob @@ -157,9 +157,6 @@ def _create_kubernetes_job(self, job_id, paths, feature_view): # Create a k8s configmap with information needed by bytewax self._create_configuration_map(job_id, paths, feature_view, self.namespace) - # Create the k8s service definition, used for bytewax communication - self._create_service_definition(job_id, self.namespace) - # Create the k8s job definition self._create_job_definition( job_id, @@ -175,14 +172,10 @@ def _create_kubernetes_job(self, job_id, paths, feature_view): def _create_configuration_map(self, job_id, paths, feature_view, namespace): """Create a Kubernetes configmap for this job""" - feature_store_configuration = yaml.dump( - yaml.safe_load( - self.repo_config.json( - exclude={"repo_path"}, - exclude_unset=True, - ) - ) - ) + repo_path = self.repo_config.repo_path + assert repo_path + feature_store_path = get_default_yaml_file_path(repo_path) + feature_store_configuration = feature_store_path.read_text() materialization_config = yaml.dump( {"paths": paths, "feature_view": feature_view.name} @@ -204,41 +197,6 @@ def _create_configuration_map(self, job_id, paths, feature_view, namespace): body=configmap_manifest, ) - def _create_service_definition(self, job_id, namespace): - """Creates a kubernetes service definition. - - This service definition is created to allow bytewax workers - to communicate with each other. - """ - service_definition = { - "apiVersion": "v1", - "kind": "Service", - "metadata": { - "name": f"dataflow-{job_id}", - "namespace": namespace, - }, - "spec": { - "clusterIP": "None", - "clusterIPs": ["None"], - "internalTrafficPolicy": "Cluster", - "ipFamilies": ["IPv4"], - "ipFamilyPolicy": "SingleStack", - "ports": [ - { - "name": "worker", - "port": 9999, - "protocol": "TCP", - "targetPort": 9999, - } - ], - "selector": {"job-name": f"dataflow-{job_id}"}, - "sessionAffinity": "None", - "type": "ClusterIP", - }, - } - - utils.create_from_dict(self.k8s_client, service_definition) - def _create_job_definition(self, job_id, namespace, pods, env): """Create a kubernetes job definition.""" job_env = [ @@ -269,10 +227,6 @@ def _create_job_definition(self, job_id, namespace, pods, env): "name": "BYTEWAX_KEEP_CONTAINER_ALIVE", "value": "false", }, - { - "name": "BYTEWAX_HOSTFILE_PATH", - "value": "/etc/bytewax/hostfile.txt", - }, { "name": "BYTEWAX_STATEFULSET_NAME", "value": f"dataflow-{job_id}", @@ -299,11 +253,6 @@ def _create_job_definition(self, job_id, namespace, pods, env): "subdomain": f"dataflow-{job_id}", "initContainers": [ { - "command": [ - "sh", - "-c", - f'set -ex\n# Generate hostfile.txt.\necho "dataflow-{job_id}-0.dataflow-{job_id}.{namespace}.svc.cluster.local:9999" > /etc/bytewax/hostfile.txt\nreplicas=$(($BYTEWAX_REPLICAS-1))\nx=1\nwhile [ $x -le $replicas ]\ndo\n echo "dataflow-{job_id}-$x.dataflow-{job_id}.{namespace}.svc.cluster.local:9999" >> /etc/bytewax/hostfile.txt\n x=$(( $x + 1 ))\ndone', - ], "env": [ { "name": "BYTEWAX_REPLICAS", diff --git a/sdk/python/feast/infra/materialization/contrib/spark/spark_materialization_engine.py b/sdk/python/feast/infra/materialization/contrib/spark/spark_materialization_engine.py index 66eb97bca78..00f2c950a28 100644 --- a/sdk/python/feast/infra/materialization/contrib/spark/spark_materialization_engine.py +++ b/sdk/python/feast/infra/materialization/contrib/spark/spark_materialization_engine.py @@ -1,4 +1,3 @@ -import tempfile from dataclasses import dataclass from datetime import datetime from typing import Callable, List, Literal, Optional, Sequence, Union @@ -196,7 +195,7 @@ class _SparkSerializedArtifacts: """Class to assist with serializing unpicklable artifacts to the spark workers""" feature_view_proto: str - repo_config_file: str + repo_config_byte: str @classmethod def serialize(cls, feature_view, repo_config): @@ -205,12 +204,10 @@ def serialize(cls, feature_view, repo_config): feature_view_proto = feature_view.to_proto().SerializeToString() # serialize repo_config to disk. Will be used to instantiate the online store - repo_config_file = tempfile.NamedTemporaryFile(delete=False).name - with open(repo_config_file, "wb") as f: - dill.dump(repo_config, f) + repo_config_byte = dill.dumps(repo_config) return _SparkSerializedArtifacts( - feature_view_proto=feature_view_proto, repo_config_file=repo_config_file + feature_view_proto=feature_view_proto, repo_config_byte=repo_config_byte ) def unserialize(self): @@ -220,8 +217,7 @@ def unserialize(self): feature_view = FeatureView.from_proto(proto) # load - with open(self.repo_config_file, "rb") as f: - repo_config = dill.load(f) + repo_config = dill.loads(self.repo_config_byte) provider = PassthroughProvider(repo_config) online_store = provider.online_store diff --git a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py index 01f89f80bb7..665a65fec53 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py +++ b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py @@ -18,7 +18,7 @@ from feast import FeatureView, OnDemandFeatureView from feast.data_source import DataSource -from feast.errors import InvalidEntityType +from feast.errors import EntitySQLEmptyResults, InvalidEntityType from feast.feature_view import DUMMY_ENTITY_ID, DUMMY_ENTITY_VAL from feast.infra.offline_stores import offline_utils from feast.infra.offline_stores.contrib.spark_offline_store.spark_source import ( @@ -449,7 +449,13 @@ def _get_entity_df_event_timestamp_range( # If the entity_df is a string (SQL query), determine range # from table df = spark_session.sql(entity_df).select(entity_df_event_timestamp_col) + + # Checks if executing entity sql resulted in any data + if df.rdd.isEmpty(): + raise EntitySQLEmptyResults(entity_df) + # TODO(kzhang132): need utc conversion here. + entity_df_event_timestamp_range = ( df.agg({entity_df_event_timestamp_col: "min"}).collect()[0][0], df.agg({entity_df_event_timestamp_col: "max"}).collect()[0][0], diff --git a/sdk/python/feast/infra/offline_stores/file.py b/sdk/python/feast/infra/offline_stores/file.py index 09216ff8ff8..29897aef430 100644 --- a/sdk/python/feast/infra/offline_stores/file.py +++ b/sdk/python/feast/infra/offline_stores/file.py @@ -662,14 +662,33 @@ def _drop_duplicates( created_timestamp_column: str, entity_df_event_timestamp_col: str, ) -> dd.DataFrame: - if created_timestamp_column: - df_to_join = df_to_join.sort_values( - by=created_timestamp_column, na_position="first" - ) + column_order = df_to_join.columns + + # try-catch block is added to deal with this issue https://github.com/dask/dask/issues/8939. + # TODO(kevjumba): remove try catch when fix is merged upstream in Dask. + try: + if created_timestamp_column: + df_to_join = df_to_join.sort_values( + by=created_timestamp_column, na_position="first" + ) + df_to_join = df_to_join.persist() + + df_to_join = df_to_join.sort_values(by=timestamp_field, na_position="first") df_to_join = df_to_join.persist() - df_to_join = df_to_join.sort_values(by=timestamp_field, na_position="first") - df_to_join = df_to_join.persist() + except ZeroDivisionError: + # Use 1 partition to get around case where everything in timestamp column is the same so the partition algorithm doesn't + # try to divide by zero. + if created_timestamp_column: + df_to_join = df_to_join[column_order].sort_values( + by=created_timestamp_column, na_position="first", npartitions=1 + ) + df_to_join = df_to_join.persist() + + df_to_join = df_to_join[column_order].sort_values( + by=timestamp_field, na_position="first", npartitions=1 + ) + df_to_join = df_to_join.persist() df_to_join = df_to_join.drop_duplicates( all_join_keys + [entity_df_event_timestamp_col], diff --git a/sdk/python/feast/infra/offline_stores/file_source.py b/sdk/python/feast/infra/offline_stores/file_source.py index 81a83c22457..e9f3735dee6 100644 --- a/sdk/python/feast/infra/offline_stores/file_source.py +++ b/sdk/python/feast/infra/offline_stores/file_source.py @@ -56,7 +56,7 @@ def __init__( tags (optional): A dictionary of key-value pairs to store arbitrary metadata. owner (optional): The owner of the file source, typically the email of the primary maintainer. - timestamp_field (optional): Event timestamp foe;d used for point in time + timestamp_field (optional): Event timestamp field used for point in time joins of feature values. Examples: diff --git a/sdk/python/feast/infra/offline_stores/snowflake.py b/sdk/python/feast/infra/offline_stores/snowflake.py index 2d621de50ff..330c2ffae54 100644 --- a/sdk/python/feast/infra/offline_stores/snowflake.py +++ b/sdk/python/feast/infra/offline_stores/snowflake.py @@ -1,7 +1,9 @@ import contextlib import os import uuid +import warnings from datetime import datetime +from functools import reduce from pathlib import Path from typing import ( Any, @@ -21,11 +23,16 @@ import pyarrow from pydantic import Field, StrictStr from pydantic.typing import Literal +from pyspark.sql import DataFrame, SparkSession from pytz import utc from feast import OnDemandFeatureView from feast.data_source import DataSource -from feast.errors import EntitySQLEmptyResults, InvalidEntityType +from feast.errors import ( + EntitySQLEmptyResults, + InvalidEntityType, + InvalidSparkSessionException, +) from feast.feature_logging import LoggingConfig, LoggingSource from feast.feature_view import DUMMY_ENTITY_ID, DUMMY_ENTITY_VAL, FeatureView from feast.infra.offline_stores import offline_utils @@ -57,6 +64,8 @@ raise FeastExtrasDependencyImportError("snowflake", str(e)) +warnings.filterwarnings("ignore", category=DeprecationWarning) + class SnowflakeOfflineStoreConfig(FeastConfigBaseModel): """Offline store config for Snowflake""" @@ -447,6 +456,41 @@ def to_sql(self) -> str: with self._query_generator() as query: return query + def to_spark_df(self, spark_session: SparkSession) -> DataFrame: + """ + Method to convert snowflake query results to pyspark data frame. + + Args: + spark_session: spark Session variable of current environment. + + Returns: + spark_df: A pyspark dataframe. + """ + + if isinstance(spark_session, SparkSession): + with self._query_generator() as query: + + arrow_batches = execute_snowflake_statement( + self.snowflake_conn, query + ).fetch_arrow_batches() + + if arrow_batches: + spark_df = reduce( + DataFrame.unionAll, + [ + spark_session.createDataFrame(batch.to_pandas()) + for batch in arrow_batches + ], + ) + + return spark_df + + else: + raise EntitySQLEmptyResults(query) + + else: + raise InvalidSparkSessionException(spark_session) + def persist(self, storage: SavedDatasetStorage, allow_overwrite: bool = False): assert isinstance(storage, SavedDatasetSnowflakeStorage) self.to_snowflake(table_name=storage.snowflake_options.table) diff --git a/sdk/python/feast/infra/offline_stores/snowflake_source.py b/sdk/python/feast/infra/offline_stores/snowflake_source.py index 40e50b3cab9..cc5208a6761 100644 --- a/sdk/python/feast/infra/offline_stores/snowflake_source.py +++ b/sdk/python/feast/infra/offline_stores/snowflake_source.py @@ -263,6 +263,16 @@ def get_table_column_names_and_types( result.dtypes[column].name ] else: + if len(result) > 0: + max_value = result.iloc[0][0] + if max_value is not None and len(str(max_value)) <= 9: + row["snowflake_type"] = "NUMBER32" + continue + elif ( + max_value is not None and len(str(max_value)) <= 18 + ): + row["snowflake_type"] = "NUMBER64" + continue raise NotImplementedError( "NaNs or Numbers larger than INT64 are not supported" ) diff --git a/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/README.md b/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/README.md index 3dea1917aa1..7d9393f30ec 100644 --- a/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/README.md +++ b/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/README.md @@ -58,6 +58,8 @@ online_store: load_balancing: # optional local_dc: 'datacenter1' # optional load_balancing_policy: 'TokenAwarePolicy(DCAwareRoundRobinPolicy)' # optional + read_concurrency: 100 # optional + write_concurrency: 100 # optional ``` #### Astra DB setup: @@ -84,6 +86,8 @@ online_store: load_balancing: # optional local_dc: 'eu-central-1' # optional load_balancing_policy: 'TokenAwarePolicy(DCAwareRoundRobinPolicy)' # optional + read_concurrency: 100 # optional + write_concurrency: 100 # optional ``` #### Protocol version and load-balancing settings @@ -111,6 +115,14 @@ The former parameter is a region name for Astra DB instances (as can be verified See the source code of the online store integration for the allowed values of the latter parameter. +#### Read/write concurrency value + +You can optionally specify the value of `read_concurrency` and `write_concurrency`, +which will be passed to the Cassandra driver function handling +[concurrent reading/writing of multiple entities](https://docs.datastax.com/en/developer/python-driver/3.25/api/cassandra/concurrent/#module-cassandra.concurrent). +Consult the reference for guidance on this parameter (which in most cases can be left to its default value of). +This is relevant only for retrieval of several entities at once and during bulk writes, such as in the materialization step. + ### More info For a more detailed walkthrough, please see the diff --git a/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py b/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py index 71d3167c46f..34a8cab036d 100644 --- a/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py +++ b/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py @@ -30,6 +30,7 @@ ResultSet, Session, ) +from cassandra.concurrent import execute_concurrent_with_args from cassandra.policies import DCAwareRoundRobinPolicy, TokenAwarePolicy from cassandra.query import PreparedStatement from pydantic import StrictFloat, StrictInt, StrictStr @@ -166,6 +167,22 @@ class CassandraLoadBalancingPolicy(FeastConfigBaseModel): wrapped into an execution profile if present. """ + read_concurrency: Optional[StrictInt] = 100 + """ + Value of the `concurrency` parameter internally passed to Cassandra driver's + `execute_concurrent_with_args` call when reading rows from tables. + See https://docs.datastax.com/en/developer/python-driver/3.25/api/cassandra/concurrent/#module-cassandra.concurrent . + Default: 100. + """ + + write_concurrency: Optional[StrictInt] = 100 + """ + Value of the `concurrency` parameter internally passed to Cassandra driver's + `execute_concurrent_with_args` call when writing rows to tables. + See https://docs.datastax.com/en/developer/python-driver/3.25/api/cassandra/concurrent/#module-cassandra.concurrent . + Default: 100. + """ + class CassandraOnlineStore(OnlineStore): """ @@ -318,21 +335,37 @@ def online_write_batch( display progress. """ project = config.project - for entity_key, values, timestamp, created_ts in data: - entity_key_bin = serialize_entity_key( - entity_key, - entity_key_serialization_version=config.entity_key_serialization_version, - ).hex() - with tracing_span(name="remote_call"): - self._write_rows( - config, - project, - table, - entity_key_bin, - values.items(), - timestamp, - created_ts, - ) + + def unroll_insertion_tuples() -> Iterable[Tuple[str, bytes, str, datetime]]: + """ + We craft an iterable over all rows to be inserted (entities->features), + but this way we can call `progress` after each entity is done. + """ + for entity_key, values, timestamp, created_ts in data: + entity_key_bin = serialize_entity_key( + entity_key, + entity_key_serialization_version=config.entity_key_serialization_version, + ).hex() + for feature_name, val in values.items(): + params: Tuple[str, bytes, str, datetime] = ( + feature_name, + val.SerializeToString(), + entity_key_bin, + timestamp, + ) + yield params + # this happens N-1 times, will be corrected outside: + if progress: + progress(1) + + with tracing_span(name="remote_call"): + self._write_rows_concurrently( + config, + project, + table, + unroll_insertion_tuples(), + ) + # correction for the last missing call to `progress`: if progress: progress(1) @@ -358,32 +391,36 @@ def online_read( result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = [] - for entity_key in entity_keys: - entity_key_bin = serialize_entity_key( + entity_key_bins = [ + serialize_entity_key( entity_key, entity_key_serialization_version=config.entity_key_serialization_version, ).hex() + for entity_key in entity_keys + ] + + with tracing_span(name="remote_call"): + feature_rows_sequence = self._read_rows_by_entity_keys( + config, + project, + table, + entity_key_bins, + columns=["feature_name", "value", "event_ts"], + ) - with tracing_span(name="remote_call"): - feature_rows = self._read_rows_by_entity_key( - config, - project, - table, - entity_key_bin, - columns=["feature_name", "value", "event_ts"], - ) - + for entity_key_bin, feature_rows in zip(entity_key_bins, feature_rows_sequence): res = {} res_ts = None - for feature_row in feature_rows: - if ( - requested_features is None - or feature_row.feature_name in requested_features - ): - val = ValueProto() - val.ParseFromString(feature_row.value) - res[feature_row.feature_name] = val - res_ts = feature_row.event_ts + if feature_rows: + for feature_row in feature_rows: + if ( + requested_features is None + or feature_row.feature_name in requested_features + ): + val = ValueProto() + val.ParseFromString(feature_row.value) + res[feature_row.feature_name] = val + res_ts = feature_row.event_ts if not res: result.append((None, None)) else: @@ -445,46 +482,31 @@ def _fq_table_name(keyspace: str, project: str, table: FeatureView) -> str: """ return f'"{keyspace}"."{project}_{table.name}"' - def _write_rows( + def _write_rows_concurrently( self, config: RepoConfig, project: str, table: FeatureView, - entity_key_bin: str, - features_vals: Iterable[Tuple[str, ValueProto]], - timestamp: datetime, - created_ts: Optional[datetime], + rows: Iterable[Tuple[str, bytes, str, datetime]], ): - """ - Handle the CQL (low-level) insertion of feature values to a table. - - Note: `created_ts` can be None: in that case we avoid explicitly - inserting it to prevent unnecessary tombstone creation on Cassandra. - Note: `created_ts` is being deprecated (July 2022) and the following - reflects this fact. - """ session: Session = self._get_session(config) keyspace: str = self._keyspace fqtable = CassandraOnlineStore._fq_table_name(keyspace, project, table) insert_cql = self._get_cql_statement(config, "insert4", fqtable=fqtable) - for feature_name, val in features_vals: - params: Sequence[object] = ( - feature_name, - val.SerializeToString(), - entity_key_bin, - timestamp, - ) - session.execute( - insert_cql, - params, - ) + # + execute_concurrent_with_args( + session, + insert_cql, + rows, + concurrency=config.online_store.write_concurrency, + ) - def _read_rows_by_entity_key( + def _read_rows_by_entity_keys( self, config: RepoConfig, project: str, table: FeatureView, - entity_key_bin: str, + entity_key_bins: List[str], columns: Optional[List[str]] = None, ) -> ResultSet: """ @@ -500,7 +522,25 @@ def _read_rows_by_entity_key( fqtable=fqtable, columns=projection_columns, ) - return session.execute(select_cql, [entity_key_bin]) + retrieval_results = execute_concurrent_with_args( + session, + select_cql, + ((entity_key_bin,) for entity_key_bin in entity_key_bins), + concurrency=config.online_store.read_concurrency, + ) + # execute_concurrent_with_args return a sequence + # of (success, result_or_exception) pairs: + returned_sequence = [] + for success, result_or_exception in retrieval_results: + if success: + returned_sequence.append(result_or_exception) + else: + # an exception + logger.error( + f"Cassandra online store exception during concurrent fetching: {str(result_or_exception)}" + ) + returned_sequence.append(None) + return returned_sequence def _drop_table( self, diff --git a/sdk/python/feast/templates/athena/feature_repo/feature_store.yaml b/sdk/python/feast/templates/athena/feature_repo/feature_store.yaml index 13e7898e861..bd12e906d1f 100644 --- a/sdk/python/feast/templates/athena/feature_repo/feature_store.yaml +++ b/sdk/python/feast/templates/athena/feature_repo/feature_store.yaml @@ -6,8 +6,9 @@ online_store: path: online_store.db offline_store: type: athena - region: ap-northeast-2 - database: sampledb + region: {AWS region} + database: {The database in the data catalog to be used by Athena} data_source: AwsDataCatalog - s3_staging_location: s3://sagemaker-yelo-test + s3_staging_location: s3://{S3 bucket to be used by Feast} + workgroup: {Workgroup for Athena} entity_key_serialization_version: 2 \ No newline at end of file diff --git a/sdk/python/feast/templates/athena/feature_repo/test_workflow.py b/sdk/python/feast/templates/athena/feature_repo/test_workflow.py index 7d7daff8650..bf69a4bff05 100644 --- a/sdk/python/feast/templates/athena/feature_repo/test_workflow.py +++ b/sdk/python/feast/templates/athena/feature_repo/test_workflow.py @@ -3,16 +3,28 @@ import pandas as pd -from feast import Entity, Feature, FeatureStore, FeatureView, ValueType +from feast import Entity, FeatureStore, FeatureView, Field from feast.infra.offline_stores.contrib.athena_offline_store.athena_source import ( AthenaSource, ) +from feast.types import Float64, Int64 def test_end_to_end(): try: - fs = FeatureStore(".") + + # Before running this test method + # 1. Upload the driver_stats.parquet file to your S3 bucket. + # (https://github.com/feast-dev/feast-custom-offline-store-demo/tree/main/feature_repo/data) + # 2. Using AWS Glue Crawler, create a table in the data catalog. The generated table can be queried through Athena. + # 3. Specify the S3 bucket name, data source(AwsDataCatalog), database name, Athena's workgroup, etc. in feature_store.yaml + + fs = FeatureStore("./feature_repo") + + # Partition pruning has a significant impact on Athena's query performance and cost. + # If offline feature dataset is large, it is highly recommended to create partitions using date columns such as ('created','event_timestamp') + # The date_partition_column must be in form of YYYY-MM-DD(string) as in the beginning of the date column. driver_hourly_stats = AthenaSource( timestamp_field="event_timestamp", @@ -21,31 +33,29 @@ def test_end_to_end(): database="sampledb", data_source="AwsDataCatalog", created_timestamp_column="created", - # date_partition_column="std_date" + # date_partition_column="std_date" #YYYY-MM-DD ) driver = Entity( name="driver_id", - value_type=ValueType.INT64, description="driver id", ) driver_hourly_stats_view = FeatureView( name="driver_hourly_stats", - entities=["driver_id"], - ttl=timedelta(days=365), - features=[ - Feature(name="conv_rate", dtype=ValueType.FLOAT), - Feature(name="acc_rate", dtype=ValueType.FLOAT), - Feature(name="avg_daily_trips", dtype=ValueType.INT64), + entities=[driver], + ttl=timedelta(days=500), + schema=[ + Field(name="conv_rate", dtype=Float64), + Field(name="acc_rate", dtype=Float64), + Field(name="avg_daily_trips", dtype=Int64), ], online=True, - batch_source=driver_hourly_stats, + source=driver_hourly_stats, ) # apply repository fs.apply([driver_hourly_stats, driver, driver_hourly_stats_view]) - print(fs.list_data_sources()) print(fs.list_feature_views()) @@ -54,7 +64,6 @@ def test_end_to_end(): ) # Read features from offline store - feature_vector = ( fs.get_historical_features( features=["driver_hourly_stats:conv_rate"], entity_df=entity_df diff --git a/sdk/python/feast/templates/cassandra/bootstrap.py b/sdk/python/feast/templates/cassandra/bootstrap.py index 464eba271fe..fa70917914f 100644 --- a/sdk/python/feast/templates/cassandra/bootstrap.py +++ b/sdk/python/feast/templates/cassandra/bootstrap.py @@ -70,7 +70,7 @@ def collect_cassandra_store_settings(): sys.exit(1) needs_port = click.confirm("Need to specify port?", default=False) if needs_port: - c_port = click.prompt("Port to use", default=9042, type=int) + c_port = click.prompt(" Port to use", default=9042, type=int) else: c_port = None use_auth = click.confirm( @@ -78,8 +78,8 @@ def collect_cassandra_store_settings(): default=False, ) if use_auth: - c_username = click.prompt("Database username") - c_password = click.prompt("Database password", hide_input=True) + c_username = click.prompt(" Database username") + c_password = click.prompt(" Database password", hide_input=True) else: c_username = None c_password = None @@ -95,7 +95,7 @@ def collect_cassandra_store_settings(): ) if specify_protocol_version: c_protocol_version = click.prompt( - "Protocol version", + " Protocol version", default={"A": 4, "C": 5}.get(db_type, 5), type=int, ) @@ -105,11 +105,11 @@ def collect_cassandra_store_settings(): specify_lb = click.confirm("Specify load-balancing?", default=False) if specify_lb: c_local_dc = click.prompt( - "Local datacenter (for load-balancing)", + " Local datacenter (for load-balancing)", default="datacenter1" if db_type == "C" else None, ) c_load_balancing_policy = click.prompt( - "Load-balancing policy", + " Load-balancing policy", type=click.Choice( [ "TokenAwarePolicy(DCAwareRoundRobinPolicy)", @@ -122,6 +122,18 @@ def collect_cassandra_store_settings(): c_local_dc = None c_load_balancing_policy = None + specify_concurrency = click.confirm("Specify concurrency levels?", default=False) + if specify_concurrency: + c_r_concurrency = click.prompt( + " Read-concurrency level?", default=100, type=int + ) + c_w_concurrency = click.prompt( + " Write-concurrency level?", default=100, type=int + ) + else: + c_r_concurrency = None + c_w_concurrency = None + return { "c_secure_bundle_path": c_secure_bundle_path, "c_hosts": c_hosts, @@ -132,6 +144,8 @@ def collect_cassandra_store_settings(): "c_protocol_version": c_protocol_version, "c_local_dc": c_local_dc, "c_load_balancing_policy": c_load_balancing_policy, + "c_r_concurrency": c_r_concurrency, + "c_w_concurrency": c_w_concurrency, } @@ -149,6 +163,8 @@ def apply_cassandra_store_settings(config_file, settings): 'c_protocol_version' 'c_local_dc' 'c_load_balancing_policy' + 'c_r_concurrency' + 'c_w_concurrency' """ write_setting_or_remove( config_file, @@ -217,6 +233,19 @@ def apply_cassandra_store_settings(config_file, settings): remove_lines_from_file(config_file, "local_dc:") remove_lines_from_file(config_file, "load_balancing_policy:") + write_setting_or_remove( + config_file, + settings["c_r_concurrency"], + "read_concurrency", + "c_r_concurrency", + ) + write_setting_or_remove( + config_file, + settings["c_w_concurrency"], + "write_concurrency", + "c_w_concurrency", + ) + def bootstrap(): """ diff --git a/sdk/python/feast/templates/cassandra/feature_repo/feature_store.yaml b/sdk/python/feast/templates/cassandra/feature_repo/feature_store.yaml index 076a0d7c6b4..ce50275554b 100644 --- a/sdk/python/feast/templates/cassandra/feature_repo/feature_store.yaml +++ b/sdk/python/feast/templates/cassandra/feature_repo/feature_store.yaml @@ -16,4 +16,6 @@ online_store: load_balancing: local_dc: c_local_dc load_balancing_policy: c_load_balancing_policy + read_concurrency: c_r_concurrency + write_concurrency: c_w_concurrency entity_key_serialization_version: 2 diff --git a/sdk/python/feast/templates/snowflake/test_workflow.py b/sdk/python/feast/templates/snowflake/test_workflow.py index 904d1e1f3e5..b121f229802 100644 --- a/sdk/python/feast/templates/snowflake/test_workflow.py +++ b/sdk/python/feast/templates/snowflake/test_workflow.py @@ -3,6 +3,7 @@ from datetime import datetime, timedelta import pandas as pd +import yaml from pytz import utc from feast import FeatureStore @@ -76,6 +77,10 @@ def fetch_historical_features_entity_sql(store: FeatureStore, for_batch_scoring) datetime.now().replace(microsecond=0, second=0, minute=0).astimezone(tz=utc) ) start_date = (end_date - timedelta(days=60)).astimezone(tz=utc) + + project_name = yaml.safe_load(open("feature_repo/feature_store.yaml"))["project"] + table_name = f"{project_name}_feast_driver_hourly_stats" + # For batch scoring, we want the latest timestamps if for_batch_scoring: print( @@ -86,7 +91,7 @@ def fetch_historical_features_entity_sql(store: FeatureStore, for_batch_scoring) SELECT "driver_id", CURRENT_TIMESTAMP() as "event_timestamp" - FROM {store.list_data_sources()[-1].get_table_query_string()} + FROM {store.get_data_source(table_name).get_table_query_string()} WHERE "event_timestamp" BETWEEN '{start_date}' AND '{end_date}' GROUP BY "driver_id" """ @@ -97,7 +102,7 @@ def fetch_historical_features_entity_sql(store: FeatureStore, for_batch_scoring) SELECT "driver_id", "event_timestamp" - FROM {store.list_data_sources()[-1].get_table_query_string()} + FROM {store.get_data_source(table_name).get_table_query_string()} WHERE "event_timestamp" BETWEEN '{start_date}' AND '{end_date}' """ diff --git a/sdk/python/feast/ui/package.json b/sdk/python/feast/ui/package.json index 442cb2466e2..259d48ea50a 100644 --- a/sdk/python/feast/ui/package.json +++ b/sdk/python/feast/ui/package.json @@ -6,7 +6,7 @@ "@elastic/datemath": "^5.0.3", "@elastic/eui": "^55.0.1", "@emotion/react": "^11.9.0", - "@feast-dev/feast-ui": "0.26.0", + "@feast-dev/feast-ui": "0.27.0", "@testing-library/jest-dom": "^5.16.4", "@testing-library/react": "^13.2.0", "@testing-library/user-event": "^13.5.0", diff --git a/sdk/python/feast/ui/yarn.lock b/sdk/python/feast/ui/yarn.lock index 66abae62440..c168c310f69 100644 --- a/sdk/python/feast/ui/yarn.lock +++ b/sdk/python/feast/ui/yarn.lock @@ -1300,10 +1300,10 @@ minimatch "^3.1.2" strip-json-comments "^3.1.1" -"@feast-dev/feast-ui@0.26.0": - version "0.26.0" - resolved "https://registry.yarnpkg.com/@feast-dev/feast-ui/-/feast-ui-0.26.0.tgz#502e61652024a2db9bc5b9b5fcfe3f3dce04161b" - integrity sha512-7TBL9zetO/7Kx+FECIaDYG+fCr7P4R6esWyZW08xJTEFEvds5VXdXvUTWCibA/sKH24dlNXwpiLpyujwqwpQgA== +"@feast-dev/feast-ui@0.27.0": + version "0.27.0" + resolved "https://registry.yarnpkg.com/@feast-dev/feast-ui/-/feast-ui-0.27.0.tgz#3a0c2ae54e0a0cfb94a6d9883d91d3dd5e649b2c" + integrity sha512-1meDDMt0SNR6u45zH9wVgakrejrywSlOPJnq5JLLkErOVLmJsFe1EN6oeGdJXKQKuk5rQGgwWX0y7Ox4o2T45g== dependencies: "@elastic/datemath" "^5.0.3" "@elastic/eui" "^55.0.1" @@ -6888,9 +6888,9 @@ loader-runner@^4.2.0: integrity sha512-3R/1M+yS3j5ou80Me59j7F9IMs4PXs3VqRrm0TU3AbKPxlmpoY1TNscJV/oGJXo8qCatFGTfDbY6W6ipGOYXfg== loader-utils@^2.0.0: - version "2.0.2" - resolved "https://registry.yarnpkg.com/loader-utils/-/loader-utils-2.0.2.tgz#d6e3b4fb81870721ae4e0868ab11dd638368c129" - integrity sha512-TM57VeHptv569d/GKh6TAYdzKblwDNiumOdkFnejjD0XwTH87K90w3O7AiJRqdQoXygvi1VQTJTLGhJl7WqA7A== + version "2.0.4" + resolved "https://registry.yarnpkg.com/loader-utils/-/loader-utils-2.0.4.tgz#8b5cb38b5c34a9a018ee1fc0e6a066d1dfcc528c" + integrity sha512-xXqpXoINfFhgua9xiqD8fPFHgkoq1mmmpE92WlDbm9rNRd/EbRb+Gqf908T2DMfuHjjJlksiK2RbHVOdD/MqSw== dependencies: big.js "^5.2.2" emojis-list "^3.0.0" diff --git a/sdk/python/feast/usage.py b/sdk/python/feast/usage.py index 0965e709998..18bb497182c 100644 --- a/sdk/python/feast/usage.py +++ b/sdk/python/feast/usage.py @@ -78,7 +78,7 @@ class FnCall: class Sampler: - def should_record(self, event) -> bool: + def should_record(self) -> bool: raise NotImplementedError @property @@ -87,7 +87,7 @@ def priority(self): class AlwaysSampler(Sampler): - def should_record(self, event) -> bool: + def should_record(self) -> bool: return True @@ -100,7 +100,7 @@ def __init__(self, ratio): self.total_counter = 0 self.sampled_counter = 0 - def should_record(self, event) -> bool: + def should_record(self) -> bool: self.total_counter += 1 if self.total_counter == self.MAX_COUNTER: self.total_counter = 1 @@ -176,10 +176,12 @@ def _set_installation_id(): def _export(event: typing.Dict[str, typing.Any]): - _executor.submit(requests.post, USAGE_ENDPOINT, json=event, timeout=30) + _executor.submit(requests.post, USAGE_ENDPOINT, json=event, timeout=2) def _produce_event(ctx: UsageContext): + if ctx.sampler and not ctx.sampler.should_record(): + return # Cannot check for unittest because typeguard pulls in unittest is_test = flags_helper.is_test() or bool({"pytest"} & sys.modules.keys()) event = { @@ -204,10 +206,6 @@ def _produce_event(ctx: UsageContext): **_constant_attributes, } event.update(ctx.attributes) - - if ctx.sampler and not ctx.sampler.should_record(event): - return - _export(event) @@ -262,6 +260,13 @@ def deeply_nested(...): """ sampler = attrs.pop("sampler", AlwaysSampler()) + def clear_context(ctx): + _context.set(UsageContext()) # reset context to default values + # TODO: Figure out why without this, new contexts.get aren't reset + ctx.call_stack = [] + ctx.completed_calls = [] + ctx.attributes = {} + def decorator(func): if not _is_enabled: return func @@ -295,17 +300,22 @@ def wrapper(*args, **kwargs): raise exc finally: - last_call = ctx.call_stack.pop(-1) - last_call.end = datetime.utcnow() - ctx.completed_calls.append(last_call) ctx.sampler = ( sampler if sampler.priority > ctx.sampler.priority else ctx.sampler ) + last_call = ctx.call_stack.pop(-1) + last_call.end = datetime.utcnow() + ctx.completed_calls.append(last_call) - if not ctx.call_stack: - # we reached the root of the stack - _context.set(UsageContext()) # reset context to default values + if not ctx.call_stack or ( + len(ctx.call_stack) == 1 + and "feast.feature_store.FeatureStore.serve" + in str(ctx.call_stack[0].fn_name) + ): + # When running `feast serve`, the serve method never exits so it gets + # stuck otherwise _produce_event(ctx) + clear_context(ctx) return wrapper diff --git a/sdk/python/tests/integration/e2e/test_validation.py b/sdk/python/tests/integration/e2e/test_validation.py index 771061b2069..f49ed80a265 100644 --- a/sdk/python/tests/integration/e2e/test_validation.py +++ b/sdk/python/tests/integration/e2e/test_validation.py @@ -104,6 +104,13 @@ def test_historical_retrieval_fails_on_validation(environment, universal_data_so features=_features, ) + ds = store.get_saved_dataset("my_other_dataset") + profiler_expectation_suite = ds.get_profile( + profiler=profiler_with_unrealistic_expectations + ) + + assert len(profiler_expectation_suite.expectation_suite["expectations"]) == 3 + with pytest.raises(ValidationFailed) as exc_info: job.to_df( validation_reference=store.get_saved_dataset( @@ -360,6 +367,7 @@ def profiler_with_feature_metadata(dataset: PandasDataset) -> ExpectationSuite: @ge_profiler def profiler_with_unrealistic_expectations(dataset: PandasDataset) -> ExpectationSuite: + # note: there are 4 expectations here and only 3 are returned from the profiler # need to create dataframe with corrupted data first df = pd.DataFrame() df["current_balance"] = [-100] @@ -371,5 +379,7 @@ def profiler_with_unrealistic_expectations(dataset: PandasDataset) -> Expectatio # this should pass other_ds.expect_column_min_to_be_between("avg_passenger_count", 0, 1000) + # this should fail + other_ds.expect_column_to_exist("missing random column") return other_ds.get_expectation_suite() diff --git a/sdk/python/tests/integration/feature_repos/universal/data_sources/file.py b/sdk/python/tests/integration/feature_repos/universal/data_sources/file.py index 7b8e5e80e67..124dd4c88d6 100644 --- a/sdk/python/tests/integration/feature_repos/universal/data_sources/file.py +++ b/sdk/python/tests/integration/feature_repos/universal/data_sources/file.py @@ -5,6 +5,8 @@ from typing import Any, Dict, List, Optional import pandas as pd +import pyarrow as pa +import pyarrow.parquet as pq from minio import Minio from testcontainers.core.generic import DockerContainer from testcontainers.core.waiting_utils import wait_for_logs @@ -87,6 +89,39 @@ def teardown(self): shutil.rmtree(d) +class FileParquetDatasetSourceCreator(FileDataSourceCreator): + def create_data_source( + self, + df: pd.DataFrame, + destination_name: str, + timestamp_field="ts", + created_timestamp_column="created_ts", + field_mapping: Dict[str, str] = None, + ) -> DataSource: + + destination_name = self.get_prefixed_table_name(destination_name) + + dataset_path = tempfile.TemporaryDirectory( + prefix=f"{self.project_name}_{destination_name}" + ) + table = pa.Table.from_pandas(df) + pq.write_to_dataset( + table, + base_dir=dataset_path.name, + compression="snappy", + format="parquet", + existing_data_behavior="overwrite_or_ignore", + ) + self.files.append(dataset_path.name) + return FileSource( + file_format=ParquetFormat(), + path=dataset_path.name, + timestamp_field=timestamp_field, + created_timestamp_column=created_timestamp_column, + field_mapping=field_mapping or {"ts_1": "ts"}, + ) + + class S3FileDataSourceCreator(DataSourceCreator): f: Any minio: DockerContainer diff --git a/sdk/python/tests/utils/e2e_test_validation.py b/sdk/python/tests/utils/e2e_test_validation.py index 43bdbefc004..bacc8c17206 100644 --- a/sdk/python/tests/utils/e2e_test_validation.py +++ b/sdk/python/tests/utils/e2e_test_validation.py @@ -27,6 +27,7 @@ ) from tests.integration.feature_repos.universal.data_sources.file import ( FileDataSourceCreator, + FileParquetDatasetSourceCreator, ) from tests.integration.feature_repos.universal.data_sources.redshift import ( RedshiftDataSourceCreator, @@ -211,6 +212,11 @@ def make_feature_store_yaml( offline_store_creator=FileDataSourceCreator, online_store=None, ), + IntegrationTestRepoConfig( + provider="local", + offline_store_creator=FileParquetDatasetSourceCreator, + online_store=None, + ), ] # Only test if this is NOT a local test diff --git a/setup.py b/setup.py index 2b09bcbe946..27f4ff7ed3c 100644 --- a/setup.py +++ b/setup.py @@ -43,7 +43,7 @@ DESCRIPTION = "Python SDK for Feast" URL = "https://github.com/feast-dev/feast" AUTHOR = "Feast" -REQUIRES_PYTHON = ">=3.7.0" +REQUIRES_PYTHON = ">=3.8.0" REQUIRED = [ "click>=7.0.0,<9.0.0", @@ -74,7 +74,7 @@ "typeguard", "fastapi>=0.68.0,<1", "uvicorn[standard]>=0.14.0,<1", - "dask>=2021.*,<2022.02.0", + "dask>=2021.*", "bowler", # Needed for automatic repo upgrades ] @@ -93,7 +93,7 @@ AWS_REQUIRED = ["boto3>=1.17.0,<=1.20.23", "docker>=5.0.2", "s3fs>=0.4.0,<=2022.01.0"] -BYTEWAX_REQUIRED = ["bytewax==0.10.0", "docker>=5.0.2", "kubernetes<=20.13.0"] +BYTEWAX_REQUIRED = ["bytewax==0.13.1", "docker>=5.0.2", "kubernetes<=20.13.0"] SNOWFLAKE_REQUIRED = [ "snowflake-connector-python[pandas]>=2.7.3,<3", diff --git a/ui/feature_repo/features.py b/ui/feature_repo/features.py index 293f438c67d..e02bb3de5d0 100644 --- a/ui/feature_repo/features.py +++ b/ui/feature_repo/features.py @@ -10,7 +10,10 @@ zipcode = Entity( name="zipcode", description="A zipcode", - tags={"owner": "danny@tecton.ai", "team": "hack week",}, + tags={ + "owner": "danny@tecton.ai", + "team": "hack week", + }, ) zipcode_source = FileSource( @@ -85,7 +88,10 @@ dob_ssn = Entity( name="dob_ssn", description="Date of birth and last four digits of social security number", - tags={"owner": "tony@tecton.ai", "team": "hack week",}, + tags={ + "owner": "tony@tecton.ai", + "team": "hack week", + }, ) credit_history_source = FileSource( @@ -123,14 +129,19 @@ # Define a request data source which encodes features / information only # available at request time (e.g. part of the user initiated HTTP request) input_request = RequestSource( - name="transaction", schema=[Field(name="transaction_amt", dtype=Int64),], + name="transaction", + schema=[ + Field(name="transaction_amt", dtype=Int64), + ], ) # Define an on demand feature view which can generate new features based on # existing feature views and RequestSource features @on_demand_feature_view( sources=[credit_history, input_request], - schema=[Field(name="transaction_gt_last_credit_card_due", dtype=Bool),], + schema=[ + Field(name="transaction_gt_last_credit_card_due", dtype=Bool), + ], ) def transaction_gt_last_credit_card_due(inputs: pd.DataFrame) -> pd.DataFrame: df = pd.DataFrame() @@ -173,14 +184,18 @@ def transaction_gt_last_credit_card_due(inputs: pd.DataFrame) -> pd.DataFrame: zipcode_model = FeatureService( name="zipcode_model", - features=[zipcode_features,], + features=[ + zipcode_features, + ], tags={"owner": "amanda@tecton.ai", "stage": "dev"}, description="Location model", ) zipcode_model_v2 = FeatureService( name="zipcode_model_v2", - features=[zipcode_money_features,], + features=[ + zipcode_money_features, + ], tags={"owner": "amanda@tecton.ai", "stage": "dev"}, description="Location model", ) diff --git a/ui/package.json b/ui/package.json index 6cb2fef5c3f..948c1655ba9 100644 --- a/ui/package.json +++ b/ui/package.json @@ -1,6 +1,6 @@ { "name": "@feast-dev/feast-ui", - "version": "0.26.0", + "version": "0.27.0", "private": false, "files": [ "dist" diff --git a/ui/src/pages/data-sources/DataSourcesListingTable.tsx b/ui/src/pages/data-sources/DataSourcesListingTable.tsx index 50c1f933a9c..ad549f991e0 100644 --- a/ui/src/pages/data-sources/DataSourcesListingTable.tsx +++ b/ui/src/pages/data-sources/DataSourcesListingTable.tsx @@ -33,8 +33,8 @@ const DatasourcesListingTable = ({ name: "Type", field: "type", sortable: true, - render: (valueType: feast.types.ValueType.Enum) => { - return feast.types.ValueType.Enum[valueType]; + render: (valueType: feast.core.DataSource.SourceType) => { + return feast.core.DataSource.SourceType[valueType]; }, }, ]; diff --git a/ui/yarn.lock b/ui/yarn.lock index 948eb78796e..16e34b029df 100644 --- a/ui/yarn.lock +++ b/ui/yarn.lock @@ -7224,9 +7224,9 @@ loader-runner@^4.2.0: integrity sha512-92+huvxMvYlMzMt0iIOukcwYBFpkYJdpl2xsZ7LrlayO7E8SOv+JJUEK17B/dJIHAOLMfh2dZZ/Y18WgmGtYNw== loader-utils@^1.4.0: - version "1.4.0" - resolved "https://registry.yarnpkg.com/loader-utils/-/loader-utils-1.4.0.tgz#c579b5e34cb34b1a74edc6c1fb36bfa371d5a613" - integrity sha512-qH0WSMBtn/oHuwjy/NucEgbx5dbxxnxup9s4PVXJUDHZBQY+s0NWA9rJf53RBnQZxfch7euUui7hpoAPvALZdA== + version "1.4.2" + resolved "https://registry.yarnpkg.com/loader-utils/-/loader-utils-1.4.2.tgz#29a957f3a63973883eb684f10ffd3d151fec01a3" + integrity sha512-I5d00Pd/jwMD2QCduo657+YM/6L3KZu++pmX9VFncxaxvHcru9jx1lBaFft+r4Mt2jK0Yhp41XlRAihzPxHNCg== dependencies: big.js "^5.2.2" emojis-list "^3.0.0" @@ -7551,11 +7551,16 @@ minimatch@^5.0.1: dependencies: brace-expansion "^2.0.1" -minimist@^1.1.1, minimist@^1.2.0, minimist@^1.2.5: +minimist@^1.1.1, minimist@^1.2.5: version "1.2.6" resolved "https://registry.yarnpkg.com/minimist/-/minimist-1.2.6.tgz#8637a5b759ea0d6e98702cfb3a9283323c93af44" integrity sha512-Jsjnk4bw3YJqYzbdyBiNsPWHPfO++UGG749Cxs6peCu5Xg4nrena6OVxOYxrQTqww0Jmwt+Ref8rggumkTLz9Q== +minimist@^1.2.0: + version "1.2.7" + resolved "https://registry.yarnpkg.com/minimist/-/minimist-1.2.7.tgz#daa1c4d91f507390437c6a8bc01078e7000c4d18" + integrity sha512-bzfL1YUZsP41gmu/qjrEk0Q6i2ix/cVeAhbCbqH9u3zYutS1cLg00qhrD0M2MVdCcx4Sc0UpP2eBWo9rotpq6g== + mkdirp@^0.5.5, mkdirp@~0.5.1: version "0.5.5" resolved "https://registry.yarnpkg.com/mkdirp/-/mkdirp-0.5.5.tgz#d91cefd62d1436ca0f41620e251288d420099def"