{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Feast Basic Customer Transactions Example" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This is a minimal example of using Feast. In this example we will\n", "1. Create a synthetic customer feature dataset\n", "2. Register a feature set to represent these features in Feast\n", "3. Ingest these features into Feast\n", "4. Create a feature query and retrieve online feature data\n", "5. Create a feature query and retrieve historical feature data" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 0. Configuration" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import os\n", "\n", "# Feast Core acts as the central feature registry\n", "FEAST_CORE_URL = os.getenv('FEAST_CORE_URL', 'core:6565')\n", "\n", "# Feast Online Serving allows for the retrieval of real-time feature data\n", "FEAST_ONLINE_SERVING_URL = os.getenv('FEAST_ONLINE_SERVING_URL', 'online-serving:6566')\n", "\n", "# Feast Batch Serving allows for the retrieval of historical feature data\n", "FEAST_BATCH_SERVING_URL = os.getenv('FEAST_BATCH_SERVING_URL', 'batch-serving:6567')\n", "\n", "# PYTHON_REPOSITORY_PATH is the path to the Python SDK inside the Feast Git Repo\n", "PYTHON_REPOSITORY_PATH = os.getenv('PYTHON_REPOSITORY_PATH', '../../')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 1. Install Feast SDK" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Install from PyPi" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!pip install --ignore-installed --upgrade feast" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "(Alternative) Install from local repository" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import os\n", "import sys\n", "os.environ['PYTHON_SDK_PATH'] = os.path.join(PYTHON_REPOSITORY_PATH, 'sdk/python')\n", "sys.path.append(os.environ['PYTHON_SDK_PATH'])" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!echo $PYTHON_SDK_PATH" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!python -m pip install --ignore-installed --upgrade -e ${PYTHON_SDK_PATH}" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 2. Import necessary modules" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import pandas as pd\n", "import numpy as np\n", "from pytz import timezone, utc\n", "from feast import Client, FeatureSet, Entity, ValueType\n", "from feast.serving.ServingService_pb2 import GetOnlineFeaturesRequest\n", "from feast.types.Value_pb2 import Value as Value\n", "from google.protobuf.duration_pb2 import Duration\n", "from datetime import datetime, timedelta\n", "from random import randrange\n", "import random" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 3. Configure Feast services and connect the Feast client\n", "\n", "Connect to Feast Core and Feast Online Serving" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "client = Client(core_url=FEAST_CORE_URL, serving_url=FEAST_ONLINE_SERVING_URL)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Create a project workspace" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "client.create_project('customer_project')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Set the active project" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "client.set_project('customer_project')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 4. Create customer features" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Here we will create customer features for 5 customers over a month. Each customer will have a set of features for every day." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "days = [datetime.utcnow().replace(hour=0, minute=0, second=0, microsecond=0).replace(tzinfo=utc) \\\n", " - timedelta(day) for day in range(31)]\n", "\n", "customers = [1001, 1002, 1003, 1004, 1005]" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "customer_features = pd.DataFrame(\n", " {\n", " \"datetime\": [day for day in days for customer in customers],\n", " \"customer_id\": [customer for day in days for customer in customers],\n", " \"daily_transactions\": [np.random.rand() * 10 for _ in range(len(days) * len(customers))],\n", " \"total_transactions\": [np.random.randint(100) for _ in range(len(days) * len(customers))],\n", " }\n", ")\n", "\n", "print(customer_features.head(500))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 5. Create feature set for customer features" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now we will create a feature set for these features. Feature sets are essentially a schema that represent\n", "feature values. Feature sets allow Feast to both identify feature values and their structure. The following feature set contains no features yet." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "customer_fs = FeatureSet(\n", " \"customer_transactions\",\n", " entities=[Entity(name='customer_id', dtype=ValueType.INT64)],\n", " max_age=Duration(seconds=432000) \n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Here we are automatically inferring the schema from the provided dataset. The two features from the dataset will be added to the feature set" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "customer_fs.infer_fields_from_df(customer_features, replace_existing_features=True)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 6. Register feature set with Feast Core" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The apply() method will register the provided feature set with Feast core, allowing users to retrieve features from this feature set" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "client.apply(customer_fs)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We test the retrieval of this feature set object (not its data), to ensure that we have the latest version" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "customer_fs = client.get_feature_set(\"customer_transactions\")\n", "print(customer_fs)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 7. Ingest data into Feast for a feature set" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "client.ingest(\"customer_transactions\", customer_features)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 8. Retrieve online features" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The process of retrieving features from the online API is very similar to that of the batch API. The only major difference is that users do not have to provide timestamps (only the latest features are returned, as long as they are within the maximum age window)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The example below retrieves online features for a single customer: \"1001\". It is possible to retrieve any features from feast, even outside of the current project." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "online_features = client.get_online_features(\n", " feature_refs=[\n", " f\"daily_transactions\",\n", " f\"total_transactions\",\n", " ],\n", " entity_rows=[\n", " GetOnlineFeaturesRequest.EntityRow(\n", " fields={\n", " \"customer_id\": Value(\n", " int64_val=1001)\n", " }\n", " )\n", " ],\n", ")\n", "print(online_features)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ " " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### The following section requires Google Cloud Platform (Google Cloud Storage and BigQuery)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 9. Create a batch retrieval query" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In order to retrieve historical feature data, the user must provide an entity_rows dataframe. This dataframe contains a combination of timestamps and entities. In this case, the user must provide both customer_ids and timestamps. \n", "\n", "We will randomly generate timestamps over the last 30 days, and assign customer_ids to them. When these entity rows are sent to the Feast Serving API to retrieve feature values, along with a list of feature ids, Feast is then able to attach the correct feature values to each entity row. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "event_timestamps = [datetime.utcnow().replace(tzinfo=utc) - timedelta(days=randrange(15), hours=randrange(24), minutes=randrange(60)) for day in range(30)]\n", "\n", "entity_rows = pd.DataFrame(\n", " {\n", " \"datetime\": event_timestamps,\n", " \"customer_id\": [customers[idx % len(customers)] for idx in range(len(event_timestamps))],\n", " }\n", ")\n", "\n", "print(entity_rows.head(10))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 10. Retrieve historical/batch features" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Next we will create a new client object, but this time we will configure it to connect to the Batch Serving Service. This service will allow us to retrieve historical feature data." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "batch_client = Client(core_url=FEAST_CORE_URL, serving_url=FEAST_BATCH_SERVING_URL)\n", "batch_client.set_project(\"customer_project\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "By calling the `get_batch_features` method we are able to retrieve a `job` object for the exporting of feature data. For every entity and timestamp combination in `entity_rows` we will be receiving a row with feature values joined to it." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": true }, "outputs": [], "source": [ "job = batch_client.get_batch_features(\n", " feature_refs=[\n", " f\"customer_project/daily_transactions\", \n", " f\"customer_project/total_transactions\", \n", " ],\n", " entity_rows=entity_rows\n", " )" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Once the job is complete, it is possible to retrieve the exported data (from Google Cloud Storage) and load it into memory as a Pandas Dataframe." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df = job.to_dataframe()\n", "print(df.head(10))" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.4" }, "pycharm": { "stem_cell": { "cell_type": "raw", "metadata": { "collapsed": false }, "source": [] } } }, "nbformat": 4, "nbformat_minor": 2 }