{ "cells": [ { "cell_type": "markdown", "id": "e2b3e96b", "metadata": {}, "source": [ "# Validating Historical Features with Great Expectations" ] }, { "cell_type": "markdown", "id": "a07ff5f8", "metadata": {}, "source": [ "![Validating Historical Features with Great Expectations](./dqm-diagram-1.png)" ] }, { "cell_type": "markdown", "id": "98678d3b", "metadata": {}, "source": [ "In this tutorial, we will use the public dataset of Chicago taxi trips to present data validation capabilities of Feast. The original dataset is stored in BigQuery and consists of raw data for each taxi trip (one row per trip) since 2013. We will generate several training datasets (aka historical features in Feast) for different periods and evaluate expectations made on one dataset against another. Our features will represent aggregations of raw data with daily intervals (eg, trips per day, average fare or speed for a specific day, etc.). We will craft some features using SQL while pulling data from BigQuery (like total trips time or total miles travelled). Another chunk of features will be implemented using Feast's on-demand transformations - features calculated on the fly when requested." ] }, { "cell_type": "markdown", "id": "817a81e7", "metadata": {}, "source": [ "Our plan:\n", "\n", "0. Prepare environment\n", "1. Pull data from BigQuery (optional)\n", "2. Declare & apply features and feature views in Feast\n", "3. Generate reference dataset\n", "4. Develop & test profiler function\n", "5. Run validation on different dataset using reference dataset & profiler" ] }, { "cell_type": "markdown", "id": "9b66b943", "metadata": {}, "source": [ "### 0. Setup" ] }, { "cell_type": "markdown", "id": "30caf020", "metadata": {}, "source": [ "Install Feast Python SDK and great expectations:" ] }, { "cell_type": "code", "execution_count": null, "id": "706e67d7", "metadata": {}, "outputs": [], "source": [ "!pip install 'feast[ge]'" ] }, { "cell_type": "code", "execution_count": null, "id": "9ca11e6d", "metadata": {}, "outputs": [], "source": [] }, { "cell_type": "markdown", "id": "93dbebd8", "metadata": {}, "source": [ "### 1. Dataset preparation (Optional) " ] }, { "cell_type": "markdown", "id": "e875bb35", "metadata": {}, "source": [ "**You can skip this step if you don't have GCP account. Please use parquet files that are coming with this tutorial instead**" ] }, { "cell_type": "code", "execution_count": null, "id": "343f03da", "metadata": {}, "outputs": [], "source": [ "!pip install google-cloud-bigquery" ] }, { "cell_type": "code", "execution_count": 3, "id": "6787c9fa", "metadata": {}, "outputs": [], "source": [ "import pyarrow.parquet\n", "\n", "from google.cloud.bigquery import Client" ] }, { "cell_type": "code", "execution_count": 4, "id": "8cad9c80", "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "/Users/pyalex/projects/feast/venv/lib/python3.7/site-packages/google/auth/_default.py:70: UserWarning: Your application has authenticated using end user credentials from Google Cloud SDK without a quota project. You might receive a \"quota exceeded\" or \"API not enabled\" error. We recommend you rerun `gcloud auth application-default login` and make sure a quota project is added. Or you can use service accounts instead. For more information about service accounts, see https://cloud.google.com/docs/authentication/\n", " warnings.warn(_CLOUD_SDK_CREDENTIALS_WARNING)\n" ] } ], "source": [ "bq_client = Client(project='kf-feast')" ] }, { "cell_type": "markdown", "id": "52db5709", "metadata": {}, "source": [ "Running some basic aggregations while pulling data from BigQuery. Grouping by taxi_id and day:" ] }, { "cell_type": "code", "execution_count": 5, "id": "a019d23d", "metadata": {}, "outputs": [], "source": [ "data_query = \"\"\"SELECT \n", " taxi_id,\n", " TIMESTAMP_TRUNC(trip_start_timestamp, DAY) as day,\n", " SUM(trip_miles) as total_miles_travelled,\n", " SUM(trip_seconds) as total_trip_seconds,\n", " SUM(fare) as total_earned,\n", " COUNT(*) as trip_count\n", "FROM `bigquery-public-data.chicago_taxi_trips.taxi_trips` \n", "WHERE \n", " trip_miles > 0 AND trip_seconds > 60 AND\n", " trip_start_timestamp BETWEEN '2019-01-01' and '2020-12-31' AND\n", " trip_total < 1000\n", "GROUP BY taxi_id, TIMESTAMP_TRUNC(trip_start_timestamp, DAY)\"\"\"" ] }, { "cell_type": "code", "execution_count": 6, "id": "0aa0ea00", "metadata": {}, "outputs": [], "source": [ "driver_stats_table = bq_client.query(data_query).to_arrow()\n", "\n", "# Storing resulting dataset into parquet file\n", "pyarrow.parquet.write_table(driver_stats_table, \"trips_stats.parquet\")" ] }, { "cell_type": "code", "execution_count": 7, "id": "aa5d86c4", "metadata": {}, "outputs": [], "source": [ "def entities_query(year):\n", " return f\"\"\"SELECT\n", " distinct taxi_id\n", "FROM `bigquery-public-data.chicago_taxi_trips.taxi_trips` \n", "WHERE\n", " trip_miles > 0 AND trip_seconds > 0 AND\n", " trip_start_timestamp BETWEEN '{year}-01-01' and '{year}-12-31'\n", "\"\"\"" ] }, { "cell_type": "code", "execution_count": 8, "id": "e9cc7a21", "metadata": {}, "outputs": [], "source": [ "entities_2019_table = bq_client.query(entities_query(2019)).to_arrow()\n", "\n", "# Storing entities (taxi ids) into parquet file\n", "pyarrow.parquet.write_table(entities_2019_table, \"entities.parquet\")" ] }, { "cell_type": "code", "execution_count": 9, "id": "644201fe", "metadata": {}, "outputs": [], "source": [ "#entities_2020_table = bq_client.query(entities_query(2020)).to_arrow()\n", "#pyarrow.parquet.write_table(entities_2019_table, \"entities_2020.parquet\")" ] }, { "cell_type": "code", "execution_count": null, "id": "10d40e89", "metadata": {}, "outputs": [], "source": [] }, { "cell_type": "markdown", "id": "3b5faba1", "metadata": {}, "source": [ "## 2. Declaring features" ] }, { "cell_type": "code", "execution_count": 10, "id": "19cd609c", "metadata": {}, "outputs": [], "source": [ "import pyarrow.parquet\n", "import pandas as pd\n", "\n", "from feast import Feature, FeatureView, Entity, FeatureStore\n", "from feast.value_type import ValueType\n", "from feast.data_format import ParquetFormat\n", "from feast.on_demand_feature_view import on_demand_feature_view\n", "from feast.infra.offline_stores.file_source import FileSource\n", "from feast.infra.offline_stores.file import SavedDatasetFileStorage\n", "\n", "from google.protobuf.duration_pb2 import Duration" ] }, { "cell_type": "code", "execution_count": 11, "id": "79ff6a81", "metadata": {}, "outputs": [], "source": [ "batch_source = FileSource(\n", " event_timestamp_column=\"day\",\n", " path=\"trips_stats.parquet\", # using parquet file that we created on previous step\n", " file_format=ParquetFormat()\n", ")" ] }, { "cell_type": "code", "execution_count": 12, "id": "7cc59092", "metadata": {}, "outputs": [], "source": [ "taxi_entity = Entity(name='taxi', join_key='taxi_id')" ] }, { "cell_type": "code", "execution_count": 13, "id": "d0dc7fda", "metadata": {}, "outputs": [], "source": [ "trips_stats_fv = FeatureView(\n", " name='trip_stats',\n", " entities=['taxi'],\n", " features=[\n", " Feature(\"total_miles_travelled\", ValueType.DOUBLE),\n", " Feature(\"total_trip_seconds\", ValueType.DOUBLE),\n", " Feature(\"total_earned\", ValueType.DOUBLE),\n", " Feature(\"trip_count\", ValueType.INT64),\n", " \n", " ],\n", " ttl=Duration(seconds=86400),\n", " batch_source=batch_source,\n", ")" ] }, { "cell_type": "markdown", "id": "d52b1567", "metadata": {}, "source": [ "*Read more about feature views in [Feast docs](https://docs.feast.dev/getting-started/concepts/feature-view)*" ] }, { "cell_type": "code", "execution_count": 14, "id": "d706f6b6", "metadata": {}, "outputs": [], "source": [ "@on_demand_feature_view(\n", " features=[\n", " Feature(\"avg_fare\", ValueType.DOUBLE),\n", " Feature(\"avg_speed\", ValueType.DOUBLE),\n", " Feature(\"avg_trip_seconds\", ValueType.DOUBLE),\n", " Feature(\"earned_per_hour\", ValueType.DOUBLE),\n", " ],\n", " inputs={\n", " \"stats\": trips_stats_fv\n", " }\n", ")\n", "def on_demand_stats(inp):\n", " out = pd.DataFrame()\n", " out[\"avg_fare\"] = inp[\"total_earned\"] / inp[\"trip_count\"]\n", " out[\"avg_speed\"] = 3600 * inp[\"total_miles_travelled\"] / inp[\"total_trip_seconds\"]\n", " out[\"avg_trip_seconds\"] = inp[\"total_trip_seconds\"] / inp[\"trip_count\"]\n", " out[\"earned_per_hour\"] = 3600 * inp[\"total_earned\"] / inp[\"total_trip_seconds\"]\n", " return out" ] }, { "cell_type": "markdown", "id": "bcff9a2e", "metadata": {}, "source": [ "*Read more about on demand feature views [here](https://docs.feast.dev/reference/alpha-on-demand-feature-view)*" ] }, { "cell_type": "code", "execution_count": 15, "id": "be0c72e7", "metadata": {}, "outputs": [], "source": [ "store = FeatureStore(\".\") # using feature_store.yaml that stored in the same directory" ] }, { "cell_type": "code", "execution_count": 16, "id": "8935e813", "metadata": {}, "outputs": [], "source": [ "store.apply([taxi_entity, trips_stats_fv, on_demand_stats]) # writing to the registry" ] }, { "cell_type": "code", "execution_count": null, "id": "57ffb7c8", "metadata": {}, "outputs": [], "source": [] }, { "cell_type": "markdown", "id": "45de2545", "metadata": {}, "source": [ "## 3. Generating training (reference) dataset" ] }, { "cell_type": "code", "execution_count": 17, "id": "03ba0273", "metadata": {}, "outputs": [], "source": [ "taxi_ids = pyarrow.parquet.read_table(\"entities.parquet\").to_pandas()" ] }, { "cell_type": "markdown", "id": "ee72c332", "metadata": {}, "source": [ "Generating range of timestamps with daily frequency:" ] }, { "cell_type": "code", "execution_count": 18, "id": "a558e4b8", "metadata": {}, "outputs": [], "source": [ "timestamps = pd.DataFrame()\n", "timestamps[\"event_timestamp\"] = pd.date_range(\"2019-06-01\", \"2019-07-01\", freq='D')" ] }, { "cell_type": "markdown", "id": "e6db6725", "metadata": {}, "source": [ "Cross merge (aka relation multiplication) produces entity dataframe with each taxi_id repeated for each timestamp:" ] }, { "cell_type": "code", "execution_count": 19, "id": "2f6ca83c", "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
taxi_idevent_timestamp
091d5288487e87c5917b813ba6f75ab1c3a9749af906a2d...2019-06-01
191d5288487e87c5917b813ba6f75ab1c3a9749af906a2d...2019-06-02
291d5288487e87c5917b813ba6f75ab1c3a9749af906a2d...2019-06-03
391d5288487e87c5917b813ba6f75ab1c3a9749af906a2d...2019-06-04
491d5288487e87c5917b813ba6f75ab1c3a9749af906a2d...2019-06-05
.........
1569797ebf27414a0c7b128e7925e1da56d51a8b81484f7630cf...2019-06-27
1569807ebf27414a0c7b128e7925e1da56d51a8b81484f7630cf...2019-06-28
1569817ebf27414a0c7b128e7925e1da56d51a8b81484f7630cf...2019-06-29
1569827ebf27414a0c7b128e7925e1da56d51a8b81484f7630cf...2019-06-30
1569837ebf27414a0c7b128e7925e1da56d51a8b81484f7630cf...2019-07-01
\n", "

156984 rows × 2 columns

\n", "
" ], "text/plain": [ " taxi_id event_timestamp\n", "0 91d5288487e87c5917b813ba6f75ab1c3a9749af906a2d... 2019-06-01\n", "1 91d5288487e87c5917b813ba6f75ab1c3a9749af906a2d... 2019-06-02\n", "2 91d5288487e87c5917b813ba6f75ab1c3a9749af906a2d... 2019-06-03\n", "3 91d5288487e87c5917b813ba6f75ab1c3a9749af906a2d... 2019-06-04\n", "4 91d5288487e87c5917b813ba6f75ab1c3a9749af906a2d... 2019-06-05\n", "... ... ...\n", "156979 7ebf27414a0c7b128e7925e1da56d51a8b81484f7630cf... 2019-06-27\n", "156980 7ebf27414a0c7b128e7925e1da56d51a8b81484f7630cf... 2019-06-28\n", "156981 7ebf27414a0c7b128e7925e1da56d51a8b81484f7630cf... 2019-06-29\n", "156982 7ebf27414a0c7b128e7925e1da56d51a8b81484f7630cf... 2019-06-30\n", "156983 7ebf27414a0c7b128e7925e1da56d51a8b81484f7630cf... 2019-07-01\n", "\n", "[156984 rows x 2 columns]" ] }, "execution_count": 19, "metadata": {}, "output_type": "execute_result" } ], "source": [ "entity_df = pd.merge(taxi_ids, timestamps, how='cross')\n", "entity_df" ] }, { "cell_type": "code", "execution_count": null, "id": "a1263617", "metadata": {}, "outputs": [], "source": [] }, { "cell_type": "markdown", "id": "e2f39876", "metadata": {}, "source": [ "Retriving historical features for resulting entity dataframe and persisting output as a saved dataset:" ] }, { "cell_type": "code", "execution_count": 20, "id": "2c4af1e4", "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "/Users/pyalex/projects/feast/sdk/python/feast/feature_store.py:853: RuntimeWarning: Saving dataset is an experimental feature. This API is unstable and it could and most probably will be changed in the future. We do not guarantee that future changes will maintain backward compatibility.\n", " RuntimeWarning,\n" ] }, { "data": { "text/plain": [ ", full_feature_names = False, tags = {}, _retrieval_job = , min_event_timestamp = 2019-06-01 00:00:00, max_event_timestamp = 2019-07-01 00:00:00)>" ] }, "execution_count": 20, "metadata": {}, "output_type": "execute_result" } ], "source": [ "job = store.get_historical_features(\n", " entity_df=entity_df,\n", " features=[\n", " \"trip_stats:total_miles_travelled\",\n", " \"trip_stats:total_trip_seconds\",\n", " \"trip_stats:total_earned\",\n", " \"trip_stats:trip_count\",\n", " \"on_demand_stats:avg_fare\",\n", " \"on_demand_stats:avg_trip_seconds\",\n", " \"on_demand_stats:avg_speed\",\n", " \"on_demand_stats:earned_per_hour\",\n", " ]\n", ")\n", "\n", "store.create_saved_dataset(\n", " from_=job,\n", " name='my_training_ds',\n", " storage=SavedDatasetFileStorage(path='my_training_ds.parquet')\n", ")" ] }, { "cell_type": "code", "execution_count": null, "id": "370a180b", "metadata": {}, "outputs": [], "source": [] }, { "cell_type": "markdown", "id": "439de503", "metadata": {}, "source": [ "## 4. Developing dataset profiler" ] }, { "cell_type": "markdown", "id": "488b14d9", "metadata": {}, "source": [ "Dataset profiler is a function that accepts dataset and generates set of its characteristics. This charasteristics will be then used to evaluate (validate) next datasets." ] }, { "cell_type": "markdown", "id": "32d2c8b2", "metadata": {}, "source": [ "**Important: datasets are not compared to each other! \n", "Feast use a reference dataset and a profiler function to generate a reference profile. \n", "This profile will be then used during validation of the tested dataset.**" ] }, { "cell_type": "code", "execution_count": 21, "id": "a46f571f", "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "02/02/2022 02:43:45 PM WARNING:/Users/pyalex/projects/feast/venv/lib/python3.7/site-packages/great_expectations/render/view/view.py:116: DeprecationWarning: 'contextfilter' is renamed to 'pass_context', the old name will be removed in Jinja 3.1.\n", " def add_data_context_id_to_url(self, jinja_context, url, add_datetime=True):\n", "\n" ] } ], "source": [ "import numpy as np\n", "\n", "from feast.dqm.profilers.ge_profiler import ge_profiler\n", "\n", "from great_expectations.core.expectation_suite import ExpectationSuite\n", "from great_expectations.dataset import PandasDataset" ] }, { "cell_type": "markdown", "id": "4d157e99", "metadata": {}, "source": [ "Loading saved dataset first and exploring the data:" ] }, { "cell_type": "code", "execution_count": 22, "id": "e57a229b", "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "/Users/pyalex/projects/feast/sdk/python/feast/feature_store.py:904: RuntimeWarning: Retrieving datasets is an experimental feature. This API is unstable and it could and most probably will be changed in the future. We do not guarantee that future changes will maintain backward compatibility.\n", " RuntimeWarning,\n" ] }, { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
total_earnedavg_trip_secondstaxi_idtotal_miles_travelledtrip_countearned_per_hourevent_timestamptotal_trip_secondsavg_fareavg_speed
068.252270.00000091d5288487e87c5917b813ba6f75ab1c3a9749af906a2d...24.702.054.1189432019-06-01 00:00:00+00:004540.034.12500019.585903
1221.00560.5000007a4a6162eaf27805aef407d25d5cb21fe779cd962922cb...54.1824.059.1436222019-06-01 00:00:00+00:0013452.09.20833314.499554
2160.501010.769231f4c9d05b215d7cbd08eca76252dae51cdb7aca9651d4ef...41.3013.043.9726032019-06-01 00:00:00+00:0013140.012.34615411.315068
3183.75697.550000c1f533318f8480a59173a9728ea0248c0d3eb187f4b897...37.3020.047.4159562019-06-01 00:00:00+00:0013951.09.1875009.625116
4217.751054.076923455b6b5cae6ca5a17cddd251485f2266d13d6a2c92f07c...69.6913.057.2064512019-06-01 00:00:00+00:0013703.016.75000018.308692
.................................
15697938.001980.0000000cccf0ec1f46d1e0beefcfdeaf5188d67e170cdff92618...14.901.069.0909092019-07-01 00:00:00+00:001980.038.00000027.090909
156980135.00551.250000beefd3462e3f5a8e854942a2796876f6db73ebbd25b435...28.4016.055.1020412019-07-01 00:00:00+00:008820.08.43750011.591837
156981NaNNaN9a3c52aa112f46cf0d129fafbd42051b0fb9b0ff8dcb0e...NaNNaNNaN2019-07-01 00:00:00+00:00NaNNaNNaN
15698263.00815.00000008308c31cd99f495dea73ca276d19a6258d7b4c9c88e43...19.964.069.5705522019-07-01 00:00:00+00:003260.015.75000022.041718
156983NaNNaN7ebf27414a0c7b128e7925e1da56d51a8b81484f7630cf...NaNNaNNaN2019-07-01 00:00:00+00:00NaNNaNNaN
\n", "

156984 rows × 10 columns

\n", "
" ], "text/plain": [ " total_earned avg_trip_seconds \\\n", "0 68.25 2270.000000 \n", "1 221.00 560.500000 \n", "2 160.50 1010.769231 \n", "3 183.75 697.550000 \n", "4 217.75 1054.076923 \n", "... ... ... \n", "156979 38.00 1980.000000 \n", "156980 135.00 551.250000 \n", "156981 NaN NaN \n", "156982 63.00 815.000000 \n", "156983 NaN NaN \n", "\n", " taxi_id \\\n", "0 91d5288487e87c5917b813ba6f75ab1c3a9749af906a2d... \n", "1 7a4a6162eaf27805aef407d25d5cb21fe779cd962922cb... \n", "2 f4c9d05b215d7cbd08eca76252dae51cdb7aca9651d4ef... \n", "3 c1f533318f8480a59173a9728ea0248c0d3eb187f4b897... \n", "4 455b6b5cae6ca5a17cddd251485f2266d13d6a2c92f07c... \n", "... ... \n", "156979 0cccf0ec1f46d1e0beefcfdeaf5188d67e170cdff92618... \n", "156980 beefd3462e3f5a8e854942a2796876f6db73ebbd25b435... \n", "156981 9a3c52aa112f46cf0d129fafbd42051b0fb9b0ff8dcb0e... \n", "156982 08308c31cd99f495dea73ca276d19a6258d7b4c9c88e43... \n", "156983 7ebf27414a0c7b128e7925e1da56d51a8b81484f7630cf... \n", "\n", " total_miles_travelled trip_count earned_per_hour \\\n", "0 24.70 2.0 54.118943 \n", "1 54.18 24.0 59.143622 \n", "2 41.30 13.0 43.972603 \n", "3 37.30 20.0 47.415956 \n", "4 69.69 13.0 57.206451 \n", "... ... ... ... \n", "156979 14.90 1.0 69.090909 \n", "156980 28.40 16.0 55.102041 \n", "156981 NaN NaN NaN \n", "156982 19.96 4.0 69.570552 \n", "156983 NaN NaN NaN \n", "\n", " event_timestamp total_trip_seconds avg_fare avg_speed \n", "0 2019-06-01 00:00:00+00:00 4540.0 34.125000 19.585903 \n", "1 2019-06-01 00:00:00+00:00 13452.0 9.208333 14.499554 \n", "2 2019-06-01 00:00:00+00:00 13140.0 12.346154 11.315068 \n", "3 2019-06-01 00:00:00+00:00 13951.0 9.187500 9.625116 \n", "4 2019-06-01 00:00:00+00:00 13703.0 16.750000 18.308692 \n", "... ... ... ... ... \n", "156979 2019-07-01 00:00:00+00:00 1980.0 38.000000 27.090909 \n", "156980 2019-07-01 00:00:00+00:00 8820.0 8.437500 11.591837 \n", "156981 2019-07-01 00:00:00+00:00 NaN NaN NaN \n", "156982 2019-07-01 00:00:00+00:00 3260.0 15.750000 22.041718 \n", "156983 2019-07-01 00:00:00+00:00 NaN NaN NaN \n", "\n", "[156984 rows x 10 columns]" ] }, "execution_count": 22, "metadata": {}, "output_type": "execute_result" } ], "source": [ "ds = store.get_saved_dataset('my_training_ds')\n", "ds.to_df()" ] }, { "cell_type": "markdown", "id": "73feb39d", "metadata": {}, "source": [ "Feast uses [Great Expectations](https://docs.greatexpectations.io/docs/) as a validation engine and [ExpectationSuite](https://legacy.docs.greatexpectations.io/en/latest/autoapi/great_expectations/core/expectation_suite/index.html#great_expectations.core.expectation_suite.ExpectationSuite) as a dataset's profile. Hence, we need to develop a function that will generate ExpectationSuite. This function will receive instance of [PandasDataset](https://legacy.docs.greatexpectations.io/en/latest/autoapi/great_expectations/dataset/index.html?highlight=pandasdataset#great_expectations.dataset.PandasDataset) (wrapper around pandas.DataFrame) so we can utilize both Pandas DataFrame API and some helper functions from PandasDataset during profiling." ] }, { "cell_type": "code", "execution_count": 23, "id": "b3a8714f", "metadata": {}, "outputs": [], "source": [ "DELTA = 0.1 # controlling allowed window in fraction of the value on scale [0, 1]\n", "\n", "@ge_profiler\n", "def stats_profiler(ds: PandasDataset) -> ExpectationSuite:\n", " # simple checks on data consistency\n", " ds.expect_column_values_to_be_between(\n", " \"avg_speed\",\n", " min_value=0,\n", " max_value=60,\n", " mostly=0.99 # allow some outliers\n", " )\n", " \n", " ds.expect_column_values_to_be_between(\n", " \"total_miles_travelled\",\n", " min_value=0,\n", " max_value=500,\n", " mostly=0.99 # allow some outliers\n", " )\n", " \n", " # expectation of means based on observed values\n", " observed_mean = ds.trip_count.mean()\n", " ds.expect_column_mean_to_be_between(\"trip_count\",\n", " min_value=observed_mean * (1 - DELTA),\n", " max_value=observed_mean * (1 + DELTA))\n", " \n", " observed_mean = ds.earned_per_hour.mean()\n", " ds.expect_column_mean_to_be_between(\"earned_per_hour\",\n", " min_value=observed_mean * (1 - DELTA),\n", " max_value=observed_mean * (1 + DELTA))\n", " \n", " \n", " # expectation of quantiles\n", " qs = [0.5, 0.75, 0.9, 0.95]\n", " observed_quantiles = ds.avg_fare.quantile(qs)\n", " \n", " ds.expect_column_quantile_values_to_be_between(\n", " \"avg_fare\",\n", " quantile_ranges={\n", " \"quantiles\": qs,\n", " \"value_ranges\": [[None, max_value] for max_value in observed_quantiles]\n", " }) \n", " \n", " return ds.get_expectation_suite()" ] }, { "cell_type": "markdown", "id": "d3f2e171", "metadata": {}, "source": [ "Testing our profiler function:" ] }, { "cell_type": "code", "execution_count": 24, "id": "504e5699", "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "02/02/2022 02:43:47 PM INFO:\t5 expectation(s) included in expectation_suite. result_format settings filtered.\n" ] }, { "data": { "text/plain": [ "" ] }, "execution_count": 24, "metadata": {}, "output_type": "execute_result" } ], "source": [ "ds.get_profile(profiler=stats_profiler)" ] }, { "cell_type": "markdown", "id": "ebb184b9", "metadata": {}, "source": [ "**Verify that all expectations that we coded in our profiler are present here. Otherwise (if you can't find some expectations) it means that it failed to pass on the reference dataset (do it silently is default behavior of Great Expectations).**" ] }, { "cell_type": "markdown", "id": "32709fff", "metadata": {}, "source": [ "Now we can create validation reference from dataset and profiler function:" ] }, { "cell_type": "code", "execution_count": 25, "id": "3cc27606", "metadata": {}, "outputs": [], "source": [ "validation_reference = ds.as_reference(profiler=stats_profiler)" ] }, { "cell_type": "markdown", "id": "983a9300", "metadata": {}, "source": [ "and test it against our existing retrieval job" ] }, { "cell_type": "code", "execution_count": 26, "id": "ba72e02a", "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "/Users/pyalex/projects/feast/sdk/python/feast/infra/offline_stores/offline_store.py:93: RuntimeWarning: Dataset validation is an experimental feature. This API is unstable and it could and most probably will be changed in the future. We do not guarantee that future changes will maintain backward compatibility.\n", " RuntimeWarning,\n", "02/02/2022 02:43:52 PM INFO:\t5 expectation(s) included in expectation_suite. result_format settings filtered.\n", "02/02/2022 02:43:53 PM INFO:Validating data_asset_name None with expectation_suite_name default\n" ] } ], "source": [ "_ = job.to_df(validation_reference=validation_reference)" ] }, { "cell_type": "markdown", "id": "e0604b9f", "metadata": {}, "source": [ "Validation successfully passed as no exception were raised." ] }, { "cell_type": "code", "execution_count": null, "id": "f7540989", "metadata": {}, "outputs": [], "source": [] }, { "cell_type": "markdown", "id": "0b6d4338", "metadata": {}, "source": [ "### 5. Validating new historical retrieval " ] }, { "cell_type": "markdown", "id": "c3b16c93", "metadata": {}, "source": [ "Creating new timestamps for Dec 2020:" ] }, { "cell_type": "code", "execution_count": 27, "id": "25450aab", "metadata": {}, "outputs": [], "source": [ "from feast.dqm.errors import ValidationFailed" ] }, { "cell_type": "code", "execution_count": 28, "id": "d7f6e892", "metadata": {}, "outputs": [], "source": [ "timestamps = pd.DataFrame()\n", "timestamps[\"event_timestamp\"] = pd.date_range(\"2020-12-01\", \"2020-12-07\", freq='D')" ] }, { "cell_type": "code", "execution_count": 29, "id": "3a5d3b7b", "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
taxi_idevent_timestamp
091d5288487e87c5917b813ba6f75ab1c3a9749af906a2d...2020-12-01
191d5288487e87c5917b813ba6f75ab1c3a9749af906a2d...2020-12-02
291d5288487e87c5917b813ba6f75ab1c3a9749af906a2d...2020-12-03
391d5288487e87c5917b813ba6f75ab1c3a9749af906a2d...2020-12-04
491d5288487e87c5917b813ba6f75ab1c3a9749af906a2d...2020-12-05
.........
354437ebf27414a0c7b128e7925e1da56d51a8b81484f7630cf...2020-12-03
354447ebf27414a0c7b128e7925e1da56d51a8b81484f7630cf...2020-12-04
354457ebf27414a0c7b128e7925e1da56d51a8b81484f7630cf...2020-12-05
354467ebf27414a0c7b128e7925e1da56d51a8b81484f7630cf...2020-12-06
354477ebf27414a0c7b128e7925e1da56d51a8b81484f7630cf...2020-12-07
\n", "

35448 rows × 2 columns

\n", "
" ], "text/plain": [ " taxi_id event_timestamp\n", "0 91d5288487e87c5917b813ba6f75ab1c3a9749af906a2d... 2020-12-01\n", "1 91d5288487e87c5917b813ba6f75ab1c3a9749af906a2d... 2020-12-02\n", "2 91d5288487e87c5917b813ba6f75ab1c3a9749af906a2d... 2020-12-03\n", "3 91d5288487e87c5917b813ba6f75ab1c3a9749af906a2d... 2020-12-04\n", "4 91d5288487e87c5917b813ba6f75ab1c3a9749af906a2d... 2020-12-05\n", "... ... ...\n", "35443 7ebf27414a0c7b128e7925e1da56d51a8b81484f7630cf... 2020-12-03\n", "35444 7ebf27414a0c7b128e7925e1da56d51a8b81484f7630cf... 2020-12-04\n", "35445 7ebf27414a0c7b128e7925e1da56d51a8b81484f7630cf... 2020-12-05\n", "35446 7ebf27414a0c7b128e7925e1da56d51a8b81484f7630cf... 2020-12-06\n", "35447 7ebf27414a0c7b128e7925e1da56d51a8b81484f7630cf... 2020-12-07\n", "\n", "[35448 rows x 2 columns]" ] }, "execution_count": 29, "metadata": {}, "output_type": "execute_result" } ], "source": [ "entity_df = pd.merge(taxi_ids, timestamps, how='cross')\n", "entity_df" ] }, { "cell_type": "code", "execution_count": 30, "id": "5be40a91", "metadata": {}, "outputs": [], "source": [ "job = store.get_historical_features(\n", " entity_df=entity_df,\n", " features=[\n", " \"trip_stats:total_miles_travelled\",\n", " \"trip_stats:total_trip_seconds\",\n", " \"trip_stats:total_earned\",\n", " \"trip_stats:trip_count\",\n", " \"on_demand_stats:avg_fare\",\n", " \"on_demand_stats:avg_trip_seconds\",\n", " \"on_demand_stats:avg_speed\",\n", " \"on_demand_stats:earned_per_hour\",\n", " ]\n", ")" ] }, { "cell_type": "markdown", "id": "1b12295b", "metadata": {}, "source": [ "Execute retrieval job with validation reference:" ] }, { "cell_type": "code", "execution_count": 31, "id": "3279f2ac", "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "/Users/pyalex/projects/feast/sdk/python/feast/infra/offline_stores/offline_store.py:93: RuntimeWarning: Dataset validation is an experimental feature. This API is unstable and it could and most probably will be changed in the future. We do not guarantee that future changes will maintain backward compatibility.\n", " RuntimeWarning,\n", "02/02/2022 02:43:58 PM INFO:\t5 expectation(s) included in expectation_suite. result_format settings filtered.\n", "02/02/2022 02:43:59 PM INFO:Validating data_asset_name None with expectation_suite_name default\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "[\n", " {\n", " \"expectation_config\": {\n", " \"expectation_type\": \"expect_column_mean_to_be_between\",\n", " \"kwargs\": {\n", " \"column\": \"trip_count\",\n", " \"min_value\": 10.387244591346153,\n", " \"max_value\": 12.695521167200855,\n", " \"result_format\": \"COMPLETE\"\n", " },\n", " \"meta\": {}\n", " },\n", " \"meta\": {},\n", " \"result\": {\n", " \"observed_value\": 6.692920555429092,\n", " \"element_count\": 35448,\n", " \"missing_count\": 31055,\n", " \"missing_percent\": 87.6071992778154\n", " },\n", " \"exception_info\": {\n", " \"raised_exception\": false,\n", " \"exception_message\": null,\n", " \"exception_traceback\": null\n", " },\n", " \"success\": false\n", " },\n", " {\n", " \"expectation_config\": {\n", " \"expectation_type\": \"expect_column_mean_to_be_between\",\n", " \"kwargs\": {\n", " \"column\": \"earned_per_hour\",\n", " \"min_value\": 52.320624975640214,\n", " \"max_value\": 63.94743052578249,\n", " \"result_format\": \"COMPLETE\"\n", " },\n", " \"meta\": {}\n", " },\n", " \"meta\": {},\n", " \"result\": {\n", " \"observed_value\": 68.99268345164135,\n", " \"element_count\": 35448,\n", " \"missing_count\": 31055,\n", " \"missing_percent\": 87.6071992778154\n", " },\n", " \"exception_info\": {\n", " \"raised_exception\": false,\n", " \"exception_message\": null,\n", " \"exception_traceback\": null\n", " },\n", " \"success\": false\n", " },\n", " {\n", " \"expectation_config\": {\n", " \"expectation_type\": \"expect_column_quantile_values_to_be_between\",\n", " \"kwargs\": {\n", " \"column\": \"avg_fare\",\n", " \"quantile_ranges\": {\n", " \"quantiles\": [\n", " 0.5,\n", " 0.75,\n", " 0.9,\n", " 0.95\n", " ],\n", " \"value_ranges\": [\n", " [\n", " null,\n", " 16.4\n", " ],\n", " [\n", " null,\n", " 26.229166666666668\n", " ],\n", " [\n", " null,\n", " 36.4375\n", " ],\n", " [\n", " null,\n", " 42.0\n", " ]\n", " ]\n", " },\n", " \"result_format\": \"COMPLETE\"\n", " },\n", " \"meta\": {}\n", " },\n", " \"meta\": {},\n", " \"result\": {\n", " \"observed_value\": {\n", " \"quantiles\": [\n", " 0.5,\n", " 0.75,\n", " 0.9,\n", " 0.95\n", " ],\n", " \"values\": [\n", " 19.5,\n", " 28.1,\n", " 38.0,\n", " 44.125\n", " ]\n", " },\n", " \"element_count\": 35448,\n", " \"missing_count\": 31055,\n", " \"missing_percent\": 87.6071992778154,\n", " \"details\": {\n", " \"success_details\": [\n", " false,\n", " false,\n", " false,\n", " false\n", " ]\n", " }\n", " },\n", " \"exception_info\": {\n", " \"raised_exception\": false,\n", " \"exception_message\": null,\n", " \"exception_traceback\": null\n", " },\n", " \"success\": false\n", " }\n", "]\n" ] } ], "source": [ "try:\n", " df = job.to_df(validation_reference=validation_reference)\n", "except ValidationFailed as exc:\n", " print(exc.validation_report)" ] }, { "cell_type": "markdown", "id": "516f1935", "metadata": {}, "source": [ "Validation failed since several expectations didn't pass:\n", "* Trip count (mean) decreased more than 10% (which is expected when comparing Dec 2020 vs June 2019)\n", "* Average Fare increased - all quantiles are higher than expected\n", "* Earn per hour (mean) increased more than 10% (most probably due to increased fare)" ] }, { "cell_type": "code", "execution_count": null, "id": "cb9d59c9", "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.12" } }, "nbformat": 4, "nbformat_minor": 5 }