From b1b9110b41efcee5b9682b6a822140c2e3e4c1ef Mon Sep 17 00:00:00 2001 From: Chaitany patel Date: Mon, 16 Mar 2026 14:15:41 +0530 Subject: [PATCH] Added Agent Skills for the feast user Signed-off-by: Chaitany patel --- skills/SKILL.md | 253 ++++++++++++++++ skills/references/configuration.md | 284 ++++++++++++++++++ skills/references/feature-definitions.md | 350 +++++++++++++++++++++++ skills/references/retrieval-and-rag.md | 287 +++++++++++++++++++ 4 files changed, 1174 insertions(+) create mode 100644 skills/SKILL.md create mode 100644 skills/references/configuration.md create mode 100644 skills/references/feature-definitions.md create mode 100644 skills/references/retrieval-and-rag.md diff --git a/skills/SKILL.md b/skills/SKILL.md new file mode 100644 index 00000000000..8be173257ac --- /dev/null +++ b/skills/SKILL.md @@ -0,0 +1,253 @@ +--- +name: feast-user-guide +description: Guide for working with Feast (Feature Store) — defining features, configuring feature_store.yaml, retrieving features online/offline, using the CLI, and building RAG retrieval pipelines. Use when the user asks about creating entities, feature views, on-demand feature views, stream feature views, feature services, data sources, feature_store.yaml configuration, feast apply/materialize commands, online or historical feature retrieval, or vector-based document retrieval with Feast. +license: Apache-2.0 +compatibility: Works with Claude Code, OpenAI Codex, and any Agent Skills compatible tool. +metadata: + author: feast-dev + version: "1.0" +--- + +# Feast User Guide + +## Quick Start + +A Feast project requires: +1. A `feature_store.yaml` config file +2. Python files defining entities, data sources, feature views, and feature services +3. Running `feast apply` to register definitions + +```bash +feast init my_project +cd my_project +feast apply +``` + +## Core Concepts + +### Entity +An entity is a collection of semantically related features (e.g., a customer, a driver). Entities have join keys used to look up features. + +```python +from feast import Entity +from feast.value_type import ValueType + +driver = Entity( + name="driver_id", + description="Driver identifier", + value_type=ValueType.INT64, +) +``` + +### Data Sources +Data sources describe where raw feature data lives. + +```python +from feast import FileSource, BigQuerySource, KafkaSource, PushSource, RequestSource +from feast.data_format import ParquetFormat + +# Batch source (file) +driver_stats_source = FileSource( + name="driver_stats_source", + path="data/driver_stats.parquet", + timestamp_field="event_timestamp", + created_timestamp_column="created", +) + +# Request source (for on-demand features) +input_request = RequestSource( + name="vals_to_add", + schema=[Field(name="val_to_add", dtype=Float64)], +) +``` + +### FeatureView +Maps features from a data source to entities with a schema, TTL, and online/offline settings. + +```python +from feast import FeatureView, Field +from feast.types import Float32, Int64, String +from datetime import timedelta + +driver_hourly_stats = FeatureView( + name="driver_hourly_stats", + entities=[driver], + ttl=timedelta(days=365), + schema=[ + Field(name="conv_rate", dtype=Float32), + Field(name="acc_rate", dtype=Float32), + Field(name="avg_daily_trips", dtype=Int64), + ], + online=True, + source=driver_stats_source, +) +``` + +### OnDemandFeatureView +Computes features at request time from other feature views and/or request data. + +```python +from feast import on_demand_feature_view +import pandas as pd + +@on_demand_feature_view( + sources=[driver_hourly_stats, input_request], + schema=[Field(name="conv_rate_plus_val", dtype=Float64)], + mode="pandas", +) +def transformed_conv_rate(inputs: pd.DataFrame) -> pd.DataFrame: + df = pd.DataFrame() + df["conv_rate_plus_val"] = inputs["conv_rate"] + inputs["val_to_add"] + return df +``` + +### FeatureService +Groups features from multiple views for retrieval. + +```python +from feast import FeatureService + +driver_fs = FeatureService( + name="driver_ranking", + features=[driver_hourly_stats, transformed_conv_rate], +) +``` + +## Feature Retrieval + +### Online (low-latency) +```python +from feast import FeatureStore + +store = FeatureStore(repo_path=".") + +features = store.get_online_features( + features=[ + "driver_hourly_stats:conv_rate", + "driver_hourly_stats:acc_rate", + ], + entity_rows=[{"driver_id": 1001}, {"driver_id": 1002}], +).to_dict() +``` + +### Historical (training data with point-in-time joins) +```python +entity_df = pd.DataFrame({ + "driver_id": [1001, 1002], + "event_timestamp": [datetime(2023, 1, 1), datetime(2023, 1, 2)], +}) + +training_df = store.get_historical_features( + entity_df=entity_df, + features=["driver_hourly_stats:conv_rate", "driver_hourly_stats:acc_rate"], +).to_df() +``` + +Or use a FeatureService: +```python +training_df = store.get_historical_features( + entity_df=entity_df, + features=driver_fs, +).to_df() +``` + +## Materialization + +Load features from offline store into online store: + +```bash +# Full materialization over a time range +feast materialize 2023-01-01T00:00:00 2023-12-31T23:59:59 + +# Incremental (from last materialized timestamp) +feast materialize-incremental $(date -u +"%Y-%m-%dT%H:%M:%S") +``` + +Python API: +```python +from datetime import datetime +store.materialize(start_date=datetime(2023, 1, 1), end_date=datetime(2023, 12, 31)) +store.materialize_incremental(end_date=datetime.utcnow()) +``` + +## CLI Commands + +| Command | Purpose | +|---------|---------| +| `feast init [DIR]` | Create new feature repository | +| `feast apply` | Register/update feature definitions | +| `feast plan` | Preview changes without applying | +| `feast materialize START END` | Materialize features to online store | +| `feast materialize-incremental END` | Incremental materialization | +| `feast entities list` | List registered entities | +| `feast feature-views list` | List feature views | +| `feast feature-services list` | List feature services | +| `feast on-demand-feature-views list` | List on-demand feature views | +| `feast teardown` | Remove infrastructure resources | +| `feast version` | Show SDK version | + +Options: `--chdir` / `-c` (run in different directory), `--feature-store-yaml` / `-f` (override config path). + +## Vector Search / RAG + +Define a feature view with vector fields for similarity search: + +```python +from feast.types import Array, Float32 + +wiki_passages = FeatureView( + name="wiki_passages", + entities=[passage_entity], + schema=[ + Field(name="passage_text", dtype=String), + Field( + name="embedding", + dtype=Array(Float32), + vector_index=True, + vector_length=384, + vector_search_metric="COSINE", + ), + ], + source=passages_source, + online=True, +) +``` + +Retrieve similar documents: +```python +results = store.retrieve_online_documents( + feature="wiki_passages:embedding", + query=query_embedding, + top_k=5, +) +``` + +## feature_store.yaml Minimal Config + +```yaml +project: my_project +registry: data/registry.db +provider: local +online_store: + type: sqlite + path: data/online_store.db +``` + +## Common Imports + +```python +from feast import ( + Entity, FeatureView, OnDemandFeatureView, FeatureService, + Field, FileSource, RequestSource, FeatureStore, +) +from feast.on_demand_feature_view import on_demand_feature_view +from feast.types import Float32, Float64, Int64, String, Bool, Array +from feast.value_type import ValueType +from datetime import timedelta +``` + +## Detailed References + +- **Feature definitions** (all types, parameters, patterns): See [references/feature-definitions.md](references/feature-definitions.md) +- **Configuration** (feature_store.yaml, all store types, auth): See [references/configuration.md](references/configuration.md) +- **Retrieval & RAG** (online/offline retrieval, vector search, RAG retriever): See [references/retrieval-and-rag.md](references/retrieval-and-rag.md) diff --git a/skills/references/configuration.md b/skills/references/configuration.md new file mode 100644 index 00000000000..9a1720984b1 --- /dev/null +++ b/skills/references/configuration.md @@ -0,0 +1,284 @@ +# Configuration Reference + +## Table of Contents +- [feature_store.yaml](#feature_storeyaml) +- [RepoConfig Fields](#repoconfig-fields) +- [Registry Configuration](#registry-configuration) +- [Online Store Types](#online-store-types) +- [Offline Store Types](#offline-store-types) +- [Batch Engine Types](#batch-engine-types) +- [Authentication](#authentication) +- [Feature Server](#feature-server) +- [Materialization Config](#materialization-config) +- [OpenLineage Config](#openlineage-config) +- [Feature Repository Layout](#feature-repository-layout) + +## feature_store.yaml + +Minimal local config: +```yaml +project: my_project +registry: data/registry.db +provider: local +online_store: + type: sqlite + path: data/online_store.db +``` + +GCP config: +```yaml +project: my_project +registry: gs://my-bucket/registry.pb +provider: gcp +online_store: + type: datastore +offline_store: + type: bigquery +``` + +AWS config: +```yaml +project: my_project +registry: s3://my-bucket/registry.pb +provider: aws +online_store: + type: dynamodb + region: us-east-1 +offline_store: + type: redshift + cluster_id: my-cluster + region: us-east-1 + database: feast + user: admin + s3_staging_location: s3://my-bucket/feast-staging +``` + +## RepoConfig Fields + +| Field | Alias | Type | Default | Description | +|-------|-------|------|---------|-------------| +| `project` | - | str | required | Project namespace (alphanumeric + underscores) | +| `project_description` | - | str | None | Project description | +| `provider` | - | str | `"local"` | `"local"`, `"gcp"`, or `"aws"` | +| `registry` | `registry_config` | str/dict | required | Registry path or config object | +| `online_store` | `online_config` | str/dict | `"sqlite"` | Online store type or config | +| `offline_store` | `offline_config` | str/dict | `"dask"` | Offline store type or config | +| `batch_engine` | `batch_engine_config` | str/dict | `"local"` | Batch materialization engine | +| `auth` | - | dict | no_auth | Authentication config | +| `feature_server` | - | dict | None | Feature server config | +| `entity_key_serialization_version` | - | int | 3 | Entity key serialization version | +| `coerce_tz_aware` | - | bool | True | Coerce timestamps to timezone-aware | +| `materialization` | `materialization_config` | dict | default | Materialization options | +| `openlineage` | `openlineage_config` | dict | None | OpenLineage config | + +## Registry Configuration + +| Field | Default | Description | +|-------|---------|-------------| +| `registry_type` | `"file"` | `"file"`, `"sql"`, `"snowflake.registry"`, `"remote"` | +| `path` | `""` | Local path, GCS/S3 URI (file), or DB connection URL (sql) | +| `cache_ttl_seconds` | 600 | Registry cache TTL (0 = no expiry) | +| `cache_mode` | `"sync"` | `"sync"` or `"thread"` | +| `s3_additional_kwargs` | None | Extra boto3 kwargs for S3 | + +### File registry +```yaml +registry: data/registry.db +``` +or +```yaml +registry: + registry_type: file + path: data/registry.db + cache_ttl_seconds: 60 +``` + +### SQL registry +```yaml +registry: + registry_type: sql + path: postgresql://user:pass@host:5432/feast # pragma: allowlist secret + cache_ttl_seconds: 60 +``` + +### Remote registry +```yaml +registry: + registry_type: remote + path: grpc://feast-registry-server:6570 +``` + +## Online Store Types + +| Type | Config Key | Use Case | +|------|-----------|----------| +| `sqlite` | `path` | Local development | +| `redis` | `connection_string` | Production, low-latency | +| `dynamodb` | `region` | AWS-native | +| `datastore` | `project_id` | GCP-native | +| `bigtable` | `project_id`, `instance` | GCP, high-throughput | +| `postgres` | `host`, `port`, `database`, `user`, `password` | Self-managed | +| `snowflake.online` | `account`, `database`, `schema` | Snowflake ecosystem | +| `milvus` | `host`, `port` | Vector search | +| `qdrant` | `host`, `port` | Vector search | +| `remote` | `path` | Remote feature server | + +### Examples + +```yaml +# SQLite (local dev) +online_store: + type: sqlite + path: data/online_store.db + +# Redis +online_store: + type: redis + connection_string: redis://localhost:6379 + +# PostgreSQL +online_store: + type: postgres + host: localhost + port: 5432 + database: feast + db_schema: public + user: postgres + password: secret + +# Milvus (vector search) +online_store: + type: milvus + host: localhost + port: 19530 +``` + +## Offline Store Types + +| Type | Use Case | +|------|----------| +| `dask` | Local development (default) | +| `duckdb` | Local, fast analytics | +| `bigquery` | GCP | +| `snowflake.offline` | Snowflake | +| `redshift` | AWS | +| `spark` | Large-scale processing | +| `postgres` | Self-managed | +| `trino` | Federated queries | +| `athena` | AWS serverless | +| `clickhouse` | Analytics | +| `remote` | Remote offline server | + +### Examples + +```yaml +# DuckDB +offline_store: + type: duckdb + +# BigQuery +offline_store: + type: bigquery + project_id: my-gcp-project + dataset: feast_dataset + +# Snowflake +offline_store: + type: snowflake.offline + account: my_account + user: user + password: pass + database: FEAST + schema: PUBLIC + warehouse: COMPUTE_WH + +# Spark +offline_store: + type: spark + spark_conf: + spark.master: "local[*]" +``` + +## Batch Engine Types + +| Type | Description | +|------|-------------| +| `local` | Local Python process (default) | +| `snowflake.engine` | Snowflake-based materialization | +| `spark.engine` | Spark-based materialization | +| `lambda` | AWS Lambda-based | +| `k8s` | Kubernetes job-based | +| `ray.engine` | Ray-based | + +```yaml +batch_engine: + type: local +``` + +## Authentication + +| Type | Description | +|------|-------------| +| `no_auth` | No authentication (default) | +| `kubernetes` | Kubernetes service account | +| `oidc` | OpenID Connect (server-side) | +| `oidc_client` | OpenID Connect (client-side) | + +```yaml +# OIDC example +auth: + type: oidc + client_id: feast-client + auth_server_url: https://auth.example.com + auth_discovery_url: https://auth.example.com/.well-known/openid-configuration +``` + +## Feature Server + +```yaml +feature_server: + type: local +``` + +MCP-based feature server: +```yaml +feature_server: + type: mcp +``` + +## Materialization Config + +```yaml +materialization: + pull_latest_features: false # Only pull latest feature values per entity +``` + +## OpenLineage Config + +```yaml +openlineage: + enabled: true + transport_type: http # http, console, file, kafka + transport_url: http://marquez:5000 + transport_endpoint: api/v1/lineage + namespace: feast + emit_on_apply: true + emit_on_materialize: true +``` + +## Feature Repository Layout + +``` +my_feature_repo/ +├── feature_store.yaml # Required config +├── .feastignore # Optional gitignore-style file +├── driver_features.py # Feature definitions +├── customer_features.py # More definitions +└── data/ + ├── driver_stats.parquet # Data files (for FileSource) + └── registry.db # Auto-generated registry +``` + +- Feast recursively scans all `.py` files for feature definitions +- Use `.feastignore` to exclude files/directories from scanning +- `feast apply` registers all discovered definitions into the registry diff --git a/skills/references/feature-definitions.md b/skills/references/feature-definitions.md new file mode 100644 index 00000000000..dc6764e94c7 --- /dev/null +++ b/skills/references/feature-definitions.md @@ -0,0 +1,350 @@ +# Feature Definitions Reference + +## Table of Contents +- [Entity](#entity) +- [Field](#field) +- [Data Sources](#data-sources) +- [FeatureView](#featureview) +- [OnDemandFeatureView](#ondemandfeatureview) +- [StreamFeatureView](#streamfeatureview) +- [FeatureService](#featureservice) +- [Aggregation](#aggregation) + +## Entity + +| Parameter | Type | Default | Description | +|-----------|------|---------|-------------| +| `name` | str | required | Unique entity name | +| `join_keys` | List[str] | `[name]` | Join keys for lookup (only one supported) | +| `value_type` | ValueType | - | Deprecated; use `join_keys` instead | +| `description` | str | `""` | Human-readable description | +| `tags` | Dict[str,str] | `{}` | Metadata tags | +| `owner` | str | `""` | Owner/maintainer | + +```python +from feast import Entity +from feast.value_type import ValueType + +driver = Entity(name="driver_id", description="Driver identifier") +customer = Entity(name="customer_id", join_keys=["customer_id"]) +``` + +## Field + +| Parameter | Type | Default | Description | +|-----------|------|---------|-------------| +| `name` | str | required | Field name | +| `dtype` | FeastType | required | Data type | +| `description` | str | `""` | Description | +| `vector_index` | bool | False | Enable vector similarity search | +| `vector_length` | int | - | Vector dimension (required if `vector_index=True`) | +| `vector_search_metric` | str | - | `"COSINE"`, `"L2"`, `"INNER_PRODUCT"` | + +### Type System + +**Scalar types** (from `feast.types`): `Float32`, `Float64`, `Int32`, `Int64`, `String`, `Bool`, `Bytes`, `UnixTimestamp` + +**Collection types**: `Array(T)` where T is a scalar type (e.g., `Array(Float32)` for embeddings) + +**ValueType enum** (legacy, from `feast.value_type`): `STRING`, `INT32`, `INT64`, `FLOAT`, `DOUBLE`, `BOOL`, `BYTES`, `UNIX_TIMESTAMP`; plus `_LIST` and `_SET` variants. + +**Python → Feast mapping**: `int` → INT64, `str` → STRING, `float` → DOUBLE, `bytes` → BYTES, `bool` → BOOL, `datetime` → UNIX_TIMESTAMP + +### Vector field example + +```python +Field( + name="embedding", + dtype=Array(Float32), + vector_index=True, + vector_length=384, + vector_search_metric="COSINE", +) +``` + +## Data Sources + +### Batch Sources + +**FileSource**: +```python +from feast import FileSource + +source = FileSource( + name="driver_stats", + path="data/driver_stats.parquet", + timestamp_field="event_timestamp", + created_timestamp_column="created", +) +``` + +**BigQuerySource**: +```python +from feast.infra.offline_stores.contrib.bigquery_offline_store.bigquery_source import BigQuerySource + +source = BigQuerySource( + name="driver_stats_bq", + table="project.dataset.driver_stats", + timestamp_field="event_timestamp", +) +``` + +Other batch sources: `SnowflakeSource`, `RedshiftSource`, `PostgreSQLSource`, `SparkSource`, `TrinoSource`, `AthenaSource`, `ClickhouseSource` + +### Stream Sources + +**KafkaSource**: +```python +from feast.data_source import KafkaSource + +source = KafkaSource( + name="driver_trips_stream", + kafka_bootstrap_servers="broker:9092", + topic="driver_trips", + timestamp_field="event_timestamp", + batch_source=file_source, # for backfill + message_format=AvroFormat(schema_json=schema), +) +``` + +**KinesisSource**: `region`, `stream_name`, `record_format`, `batch_source` + +**PushSource** (for manual push via SDK): +```python +from feast.data_source import PushSource + +push_source = PushSource(name="driver_push", batch_source=file_source) +``` + +### RequestSource (for OnDemandFeatureView) + +```python +from feast import RequestSource, Field +from feast.types import Float64 + +input_request = RequestSource( + name="vals_to_add", + schema=[Field(name="val_to_add", dtype=Float64)], +) +``` + +## FeatureView + +| Parameter | Type | Default | Description | +|-----------|------|---------|-------------| +| `name` | str | required | Unique name | +| `source` | DataSource | required | Batch or stream data source | +| `entities` | List[Entity] | `[]` | Associated entities | +| `schema` | List[Field] | `[]` | Feature schema (can be inferred from source) | +| `ttl` | timedelta | `timedelta(0)` | Time-to-live for features | +| `online` | bool | `True` | Available for online retrieval | +| `offline` | bool | `False` | Available for offline retrieval | +| `description` | str | `""` | Description | +| `tags` | Dict[str,str] | `{}` | Metadata | +| `owner` | str | `""` | Owner | +| `mode` | str | - | Transformation mode: `"python"`, `"pandas"`, `"sql"`, `"spark"`, `"ray"`, `"substrait"` | + +```python +from feast import FeatureView, Field +from feast.types import Float32, Int64 +from datetime import timedelta + +driver_hourly_stats = FeatureView( + name="driver_hourly_stats", + entities=[driver], + ttl=timedelta(days=365), + schema=[ + Field(name="conv_rate", dtype=Float32), + Field(name="acc_rate", dtype=Float32), + Field(name="avg_daily_trips", dtype=Int64), + ], + online=True, + source=driver_stats_source, +) +``` + +## OnDemandFeatureView + +Features computed at request time from other feature views and/or request data. + +| Parameter | Type | Default | Description | +|-----------|------|---------|-------------| +| `name` | str | required | Unique name | +| `sources` | List | required | Input FeatureViews and/or RequestSources | +| `schema` | List[Field] | required | Output schema | +| `mode` | str | `"pandas"` | `"pandas"` or `"python"` | +| `singleton` | bool | `False` | Single-row dict input (mode="python" only) | +| `write_to_online_store` | bool | `False` | Precompute on write instead of read | +| `aggregations` | List[Aggregation] | `[]` | Pre-transformation aggregations | + +### Pandas mode (default) + +```python +@on_demand_feature_view( + sources=[driver_hourly_stats, input_request], + schema=[Field(name="conv_rate_plus_val", dtype=Float64)], + mode="pandas", +) +def transformed_conv_rate(inputs: pd.DataFrame) -> pd.DataFrame: + df = pd.DataFrame() + df["conv_rate_plus_val"] = inputs["conv_rate"] + inputs["val_to_add"] + return df +``` + +### Python mode + +```python +@on_demand_feature_view( + sources=[driver_hourly_stats], + schema=[Field(name="conv_rate_category", dtype=String)], + mode="python", +) +def categorize_conv_rate(inputs: dict) -> dict: + output = {"conv_rate_category": []} + for rate in inputs["conv_rate"]: + output["conv_rate_category"].append("high" if rate > 0.5 else "low") + return output +``` + +### Python singleton mode + +```python +@on_demand_feature_view( + sources=[driver_hourly_stats], + schema=[Field(name="conv_rate_category", dtype=String)], + mode="python", + singleton=True, +) +def categorize_conv_rate(inputs: dict) -> dict: + rate = inputs["conv_rate"] + return {"conv_rate_category": "high" if rate > 0.5 else "low"} +``` + +### Write-to-online-store mode + +```python +@on_demand_feature_view( + sources=[push_source], + schema=[Field(name="trips_today_category", dtype=String)], + write_to_online_store=True, +) +def categorize_trips(inputs: pd.DataFrame) -> pd.DataFrame: + df = pd.DataFrame() + df["trips_today_category"] = inputs["trips_today"].apply( + lambda x: "high" if x > 10 else "low" + ) + return df +``` + +### Aggregation-based ODFV + +```python +from feast.aggregation import Aggregation + +@on_demand_feature_view( + sources=[driver_hourly_stats], + schema=[Field(name="sum_trips", dtype=Int64)], + aggregations=[Aggregation(column="avg_daily_trips", function="sum")], +) +def agg_view(inputs: pd.DataFrame) -> pd.DataFrame: + return inputs +``` + +### Validation note + +Use `feast apply --skip-feature-view-validation` if ODFV validation fails with complex logic (validation uses random inputs). + +## StreamFeatureView + +Extends FeatureView for stream sources (Kafka, Kinesis, PushSource). + +| Parameter | Type | Default | Description | +|-----------|------|---------|-------------| +| `name` | str | required | Unique name | +| `source` | DataSource | required | KafkaSource, KinesisSource, or PushSource | +| `entities` | List[Entity] | `[]` | Entities | +| `schema` | List[Field] | `[]` | Schema | +| `ttl` | timedelta | `timedelta(0)` | TTL | +| `aggregations` | List[Aggregation] | `[]` | Windowed aggregations | +| `timestamp_field` | str | - | Required if using aggregations | +| `udf` | function | - | Transformation function | +| `mode` | str | - | `"python"`, `"pandas"`, `"spark"`, `"spark_sql"` | + +```python +from feast import StreamFeatureView, Field +from feast.types import Int64 +from feast.aggregation import Aggregation +from datetime import timedelta + +driver_stream = StreamFeatureView( + name="driver_trips_stream", + entities=[driver], + source=kafka_source, + schema=[Field(name="trips", dtype=Int64)], + ttl=timedelta(hours=2), + aggregations=[ + Aggregation(column="trips", function="count", time_window=timedelta(hours=1)), + ], + timestamp_field="event_timestamp", +) +``` + +## FeatureService + +Groups features from one or more feature views for retrieval. + +| Parameter | Type | Default | Description | +|-----------|------|---------|-------------| +| `name` | str | required | Unique name | +| `features` | List | required | Feature views or projections | +| `description` | str | `""` | Description | +| `tags` | Dict[str,str] | `{}` | Metadata | +| `owner` | str | `""` | Owner | +| `logging_config` | LoggingConfig | - | Logging configuration | + +```python +from feast import FeatureService + +driver_activity_service = FeatureService( + name="driver_activity", + features=[ + driver_hourly_stats, + transformed_conv_rate, + ], + description="Features for driver activity model", +) +``` + +### Feature projections (select specific features) + +```python +driver_fs = FeatureService( + name="driver_ranking", + features=[ + driver_hourly_stats[["conv_rate", "acc_rate"]], + ], +) +``` + +## Aggregation + +For StreamFeatureView windowed aggregations. + +| Parameter | Type | Description | +|-----------|------|-------------| +| `column` | str | Source column name | +| `function` | str | `"sum"`, `"max"`, `"min"`, `"count"`, `"mean"` | +| `time_window` | timedelta | Aggregation window | +| `slide_interval` | timedelta | Slide interval (for sliding windows) | + +```python +from feast.aggregation import Aggregation +from datetime import timedelta + +agg = Aggregation( + column="trips", + function="count", + time_window=timedelta(hours=1), + slide_interval=timedelta(minutes=5), +) +``` diff --git a/skills/references/retrieval-and-rag.md b/skills/references/retrieval-and-rag.md new file mode 100644 index 00000000000..8198134e5bc --- /dev/null +++ b/skills/references/retrieval-and-rag.md @@ -0,0 +1,287 @@ +# Retrieval & RAG Reference + +## Table of Contents +- [FeatureStore Construction](#featurestore-construction) +- [Online Feature Retrieval](#online-feature-retrieval) +- [Historical Feature Retrieval](#historical-feature-retrieval) +- [Push and Write Operations](#push-and-write-operations) +- [Vector Similarity Search](#vector-similarity-search) +- [RAG Retriever](#rag-retriever) +- [FeatureStore API Quick Reference](#featurestore-api-quick-reference) + +## FeatureStore Construction + +```python +from feast import FeatureStore + +# From repo path (looks for feature_store.yaml) +store = FeatureStore(repo_path="path/to/feature_repo") + +# From config object +from feast.repo_config import RepoConfig +store = FeatureStore(config=RepoConfig( + project="my_project", + registry="data/registry.db", + provider="local", + online_store={"type": "sqlite", "path": "data/online.db"}, +)) + +# From explicit YAML path +from pathlib import Path +store = FeatureStore(fs_yaml_file=Path("custom/feature_store.yaml")) +``` + +## Online Feature Retrieval + +Low-latency lookup from the online store. Features must be materialized first. + +### By feature references +```python +result = store.get_online_features( + features=[ + "driver_hourly_stats:conv_rate", + "driver_hourly_stats:acc_rate", + "driver_hourly_stats:avg_daily_trips", + ], + entity_rows=[ + {"driver_id": 1001}, + {"driver_id": 1002}, + ], +) + +feature_dict = result.to_dict() +feature_df = result.to_df() +``` + +### By FeatureService +```python +result = store.get_online_features( + features=driver_ranking_service, + entity_rows=[{"driver_id": 1001}], +) +``` + +### Feature reference format +`"feature_view_name:feature_name"` — e.g., `"driver_hourly_stats:conv_rate"` + +## Historical Feature Retrieval + +Point-in-time correct joins for training data. Prevents data leakage by joining features based on event timestamps. + +### Basic usage +```python +import pandas as pd +from datetime import datetime + +entity_df = pd.DataFrame({ + "driver_id": [1001, 1002, 1003], + "event_timestamp": [ + datetime(2023, 6, 1), + datetime(2023, 6, 15), + datetime(2023, 7, 1), + ], +}) + +training_df = store.get_historical_features( + entity_df=entity_df, + features=[ + "driver_hourly_stats:conv_rate", + "driver_hourly_stats:acc_rate", + ], +).to_df() +``` + +### With FeatureService +```python +training_df = store.get_historical_features( + entity_df=entity_df, + features=driver_ranking_service, +).to_df() +``` + +### Output +Returns a `RetrievalJob` with methods: +- `.to_df()` — pandas DataFrame +- `.to_arrow()` — PyArrow Table +- `.to_sql_string()` — SQL query (for SQL-based offline stores) + +## Push and Write Operations + +### Push (for PushSource/StreamFeatureView) +```python +store.push( + push_source_name="driver_push", + df=pd.DataFrame({ + "driver_id": [1001], + "trips_today": [15], + "event_timestamp": [datetime.utcnow()], + }), +) +``` + +### Write to online store +```python +store.write_to_online_store( + feature_view_name="driver_hourly_stats", + df=features_df, +) +``` + +### Write to offline store +```python +store.write_to_offline_store( + feature_view_name="driver_hourly_stats", + df=features_df, +) +``` + +## Vector Similarity Search + +Requires a FeatureView with a `vector_index=True` field and an online store that supports vector search (e.g., Milvus, Qdrant, PostgreSQL with pgvector). + +### Define vector feature view +```python +from feast import Entity, FeatureView, Field, FileSource +from feast.types import Array, Float32, String + +passage_entity = Entity(name="passage_id", join_keys=["passage_id"]) + +wiki_passages = FeatureView( + name="wiki_passages", + entities=[passage_entity], + schema=[ + Field(name="passage_text", dtype=String), + Field( + name="embedding", + dtype=Array(Float32), + vector_index=True, + vector_length=384, + vector_search_metric="COSINE", + ), + ], + source=passages_source, + online=True, +) +``` + +### Retrieve similar documents +```python +# v1 API +results = store.retrieve_online_documents( + feature="wiki_passages:embedding", + query=query_embedding_vector, + top_k=5, +) + +# v2 API (supports text, vector, and image queries) +results = store.retrieve_online_documents_v2( + feature_view_name="wiki_passages", + query_string="What is machine learning?", + top_k=5, +) +``` + +### Search metrics +- `"COSINE"` — Cosine similarity (default, best for normalized embeddings) +- `"L2"` — Euclidean distance +- `"INNER_PRODUCT"` — Dot product + +## RAG Retriever + +`FeastRAGRetriever` integrates Feast with HuggingFace for retrieval-augmented generation. + +### Prerequisites +- A FeatureView with a `vector_index=True` embedding field +- Features materialized to the online store +- HuggingFace `transformers` installed + +### Setup +```python +from feast.rag_retriever import FeastRAGRetriever +from transformers import AutoTokenizer, AutoModel, AutoModelForSeq2SeqLM + +question_tokenizer = AutoTokenizer.from_pretrained("facebook/dpr-question_encoder-single-nq-base") +question_encoder = AutoModel.from_pretrained("facebook/dpr-question_encoder-single-nq-base") +generator_tokenizer = AutoTokenizer.from_pretrained("facebook/bart-large") +generator_model = AutoModelForSeq2SeqLM.from_pretrained("facebook/bart-large") + +retriever = FeastRAGRetriever( + question_encoder_tokenizer=question_tokenizer, + question_encoder=question_encoder, + generator_tokenizer=generator_tokenizer, + generator_model=generator_model, + feast_repo_path="path/to/feature_repo", + feature_view="wiki_passages", + features=["passage_text", "embedding"], + search_type="vector", # "text", "vector", or "hybrid" + id_field="passage_id", + text_field="passage_text", +) +``` + +### Retrieve documents +```python +doc_embeddings, doc_ids, doc_dicts = retriever.retrieve( + question_input_ids=question_tokenizer("What is ML?", return_tensors="pt")["input_ids"], + n_docs=5, +) +``` + +### End-to-end answer generation +```python +answer = retriever.generate_answer( + query="What is machine learning?", + top_k=5, + max_new_tokens=200, +) +print(answer) +``` + +### FeastVectorStore (lower-level) + +```python +from feast.vector_store import FeastVectorStore + +vector_store = FeastVectorStore(feast_repo_path="path/to/feature_repo") + +results = vector_store.query( + query_vector=embedding_list, + top_k=10, +) +``` + +Supports `query_vector`, `query_string`, and `query_image_bytes` for different search modalities. + +## FeatureStore API Quick Reference + +| Method | Purpose | +|--------|---------| +| `apply(objects)` | Register entities, FVs, ODFVs, SFVs, services, sources | +| `plan(desired_registry)` | Preview apply changes | +| `get_online_features(features, entity_rows)` | Low-latency online lookup | +| `get_historical_features(entity_df, features)` | Point-in-time training data | +| `materialize(start_date, end_date)` | Load offline → online store | +| `materialize_incremental(end_date)` | Incremental materialization | +| `push(push_source_name, df)` | Push data to online/offline store | +| `write_to_online_store(fv_name, df)` | Direct write to online store | +| `write_to_offline_store(fv_name, df)` | Direct write to offline store | +| `retrieve_online_documents(feature, query, top_k)` | Vector similarity search | +| `retrieve_online_documents_v2(...)` | Vector search v2 (text/vector/image) | +| `list_entities()` | List all entities | +| `list_feature_views()` | List all feature views | +| `list_on_demand_feature_views()` | List on-demand feature views | +| `list_stream_feature_views()` | List stream feature views | +| `list_feature_services()` | List feature services | +| `list_data_sources()` | List data sources | +| `get_entity(name)` | Get entity by name | +| `get_feature_view(name)` | Get feature view by name | +| `get_feature_service(name)` | Get feature service by name | +| `delete_feature_view(name)` | Delete a feature view | +| `delete_feature_service(name)` | Delete a feature service | +| `create_saved_dataset(...)` | Save a dataset for reuse | +| `refresh_registry()` | Force refresh registry cache | +| `teardown()` | Remove all infrastructure resources | +| `serve(port)` | Start feature server | +| `serve_ui(port)` | Start Feast UI | +| `serve_registry(port)` | Start registry server | +| `serve_offline(port)` | Start offline server |