diff --git a/docs/SUMMARY.md b/docs/SUMMARY.md index 5f24c2efd1b..f9d0d13f3c5 100644 --- a/docs/SUMMARY.md +++ b/docs/SUMMARY.md @@ -55,6 +55,7 @@ * [Retrieval Augmented Generation (RAG) with Feast](tutorials/rag-with-docling.md) * [RAG Fine Tuning with Feast and Milvus](../examples/rag-retriever/README.md) * [MCP - AI Agent Example](../examples/mcp_feature_store/README.md) +* [Feast-Powered AI Agent](../examples/agent_feature_store/README.md) ## How-to Guides diff --git a/docs/getting-started/genai.md b/docs/getting-started/genai.md index d8798b481d7..95287859f04 100644 --- a/docs/getting-started/genai.md +++ b/docs/getting-started/genai.md @@ -89,6 +89,17 @@ Implement semantic search by: 2. Converting search queries to embeddings 3. Finding semantically similar documents using vector search +### AI Agents with Context and Memory + +Feast can serve as both the **context provider** and **persistent memory layer** for AI agents. Unlike stateless RAG pipelines, agents make autonomous decisions about which tools to call and can write state back to the feature store: + +1. **Structured context**: Retrieve customer profiles, account data, and other entity-keyed features +2. **Knowledge retrieval**: Search vector embeddings for relevant documents +3. **Persistent memory**: Store and recall per-entity interaction history (last topic, resolution, preferences) using `write_to_online_store` +4. **Governed access**: All reads and writes are subject to the same RBAC, TTL, and audit policies as any other feature + +With MCP enabled, agents built with any framework (LangChain, CrewAI, AutoGen, or custom) can discover and call Feast tools dynamically. See the [Feast-Powered AI Agent example](../../examples/agent_feature_store/) and the blog post [Building AI Agents with Feast](https://feast.dev/blog/feast-agents-mcp/) for a complete walkthrough. + ### Scaling with Spark Integration Feast integrates with Apache Spark to enable large-scale processing of unstructured data for GenAI applications: @@ -167,9 +178,11 @@ The MCP integration uses the `fastapi_mcp` library to automatically transform yo The fastapi_mcp integration automatically exposes your Feast feature server's FastAPI endpoints as MCP tools. This means AI assistants can: * **Call `/get-online-features`** to retrieve features from the feature store +* **Call `/retrieve-online-documents`** to perform vector similarity search +* **Call `/write-to-online-store`** to persist agent state (memory, notes, interaction history) * **Use `/health`** to check server status -For a complete example, see the [MCP Feature Store Example](../../examples/mcp_feature_store/). +For a basic MCP example, see the [MCP Feature Store Example](../../examples/mcp_feature_store/). For a full agent with persistent memory, see the [Feast-Powered AI Agent Example](../../examples/agent_feature_store/). ## Learn More @@ -181,6 +194,8 @@ For more detailed information and examples: * [Milvus Quickstart Example](https://github.com/feast-dev/feast/tree/master/examples/rag/milvus-quickstart.ipynb) * [Feast + Ray: Distributed Processing for RAG Applications](https://feast.dev/blog/feast-ray-distributed-processing/) * [MCP Feature Store Example](../../examples/mcp_feature_store/) +* [Feast-Powered AI Agent Example (with Memory)](../../examples/agent_feature_store/) +* [Blog: Building AI Agents with Feast](https://feast.dev/blog/feast-agents-mcp/) * [MCP Feature Server Reference](../reference/feature-servers/mcp-feature-server.md) * [Spark Data Source](../reference/data-sources/spark.md) * [Spark Offline Store](../reference/offline-stores/spark.md) diff --git a/examples/agent_feature_store/README.md b/examples/agent_feature_store/README.md new file mode 100644 index 00000000000..3456c7cb08a --- /dev/null +++ b/examples/agent_feature_store/README.md @@ -0,0 +1,374 @@ +# Feast-Powered AI Agent Example + +This example demonstrates an **AI agent with persistent memory** that uses **Feast as both a feature store and a context memory layer** through the **Model Context Protocol (MCP)**. This demo uses **Milvus** as the vector-capable online store, but Feast supports multiple vector backends -- including **Milvus, Elasticsearch, Qdrant, PGVector, and FAISS** -- swappable via configuration. + +## Why Feast for Agents? + +Agents need more than just access to data -- they need to **remember** what happened in prior interactions. Feast's online store is entity-keyed, low-latency, governed, and supports both reads and writes, making it a natural fit for agent context and memory. + +| Capability | How Feast Provides It | +|---|---| +| **Structured context** | Entity-keyed feature retrieval (customer profiles, account data) | +| **Document search** | Vector similarity search via pluggable backends (Milvus, Elasticsearch, Qdrant, PGVector, FAISS) | +| **Persistent memory** | Agent writes interaction notes back via `write_to_online_store` | +| **Governance** | RBAC, audit trails, and feature-level permissions | +| **TTL management** | Declarative expiration on feature views (memory auto-expires) | +| **Offline analysis** | Memory is queryable offline like any other feature | + +## Architecture + +```mermaid +%%{init: {'theme': 'base', 'themeVariables': {'primaryColor': '#E3F2FD', 'primaryBorderColor': '#1565C0', 'primaryTextColor': '#0D47A1', 'lineColor': '#546E7A', 'secondaryColor': '#F3E5F5', 'tertiaryColor': '#E8F5E9'}}}%% +flowchart LR + User((("๐ง User"))):::userClass + + subgraph AgentLoop ["๐ค Agent Loop"] + LLM["LLM Engine\ntool-calling ยท reasoning"]:::agentClass + end + + subgraph Feast ["๐๏ธ Feast MCP Server"] + direction TB + ReadAPI["GET /get-online-features"]:::feastClass + WriteAPI["POST /write-to-online-store"]:::feastClass + end + + subgraph VectorStore ["๐๏ธ Online Store"] + direction TB + Profiles[("๐ค customer_profile\nplan ยท spend ยท tickets")]:::storageClass + Articles[("๐ knowledge_base\nvector embeddings")]:::storageClass + Memory[("๐ง agent_memory\ntopic ยท resolution ยท prefs")]:::memoryClass + end + + User -->|"query"| LLM + LLM -->|"recall_memory\nlookup_customer\nsearch_kb"| ReadAPI + LLM -->|"save_memory"| WriteAPI + ReadAPI --> Profiles + ReadAPI --> Articles + ReadAPI --> Memory + WriteAPI --> Memory + ReadAPI -.->|"results"| LLM + LLM -->|"answer"| User + + classDef userClass fill:#E8EAF6,stroke:#283593,color:#1A237E + classDef agentClass fill:#E3F2FD,stroke:#1565C0,color:#0D47A1 + classDef feastClass fill:#FFF3E0,stroke:#E65100,color:#BF360C + classDef storageClass fill:#E8F5E9,stroke:#2E7D32,color:#1B5E20 + classDef memoryClass fill:#F3E5F5,stroke:#6A1B9A,color:#4A148C +``` + +## Tools (backed by Feast) + +The agent has four tools. Feast is both the **read path** (context) and the **write path** (memory): + +| Tool | Direction | What it does | When the LLM calls it | +|---|---|---|---| +| `lookup_customer` | READ | Fetches customer profile features (plan, spend, tickets) | Questions about the customer's account | +| `search_knowledge_base` | READ | Retrieves support articles from the vector store | Questions needing product docs | +| `recall_memory` | READ | Reads past interaction context (last topic, open issues, preferences) | Start of every conversation | +| `save_memory` | WRITE | Persists notes about this interaction back to Feast | After resolving the question | + +### Feast as Context Memory + +The `agent_memory` feature view stores per-customer interaction state: + +```python +agent_memory = FeatureView( + name="agent_memory", + entities=[customer], + schema=[ + Field(name="last_topic", dtype=String), + Field(name="last_resolution", dtype=String), + Field(name="interaction_count", dtype=Int64), + Field(name="preferences", dtype=String), + Field(name="open_issue", dtype=String), + ], + ttl=timedelta(days=30), +) +``` + +This gives agents **persistent, governed, entity-keyed memory** that survives across sessions, is versioned, and lives under the same RBAC as every other feature -- unlike an ad-hoc Redis cache or an in-process dict. + +## Prerequisites + +- Python 3.10+ +- Feast with MCP and Milvus support +- OpenAI API key (for live tool-calling; demo mode works without it) + +## Quickstart + +### One command + +```bash +cd examples/agent_feature_store +./run_demo.sh + +# Or with live LLM tool-calling: +OPENAI_API_KEY=sk-... ./run_demo.sh +``` + +The script installs dependencies, generates sample data, starts the Feast server, runs the agent, and cleans up on exit. + +### Step by step + +### 1. Install dependencies + +```bash +pip install "feast[mcp,milvus]" +``` + +### 2. Generate sample data and apply the registry + +```bash +cd examples/agent_feature_store +python setup_data.py +``` + +This creates: +- **3 customer profiles** with attributes like plan tier, spend, and satisfaction score +- **6 knowledge-base articles** with 384-dimensional vector embeddings +- **Empty agent memory scaffold** (populated as the agent runs) + +### 3. Start the Feast MCP Feature Server + +```bash +cd feature_repo +feast serve --host 0.0.0.0 --port 6566 --workers 1 +``` + +### 4. Run the agent + +In a new terminal: + +```bash +# Without API key: runs in demo mode (simulated tool selection) +python agent.py +``` + +To run with a real LLM, set the API key and (optionally) the base URL: + +```bash +# OpenAI +export OPENAI_API_KEY="sk-..." #pragma: allowlist secret +python agent.py + +# Ollama (free, local -- no API key needed) +ollama pull llama3.1:8b +export OPENAI_API_KEY="ollama" #pragma: allowlist secret +export OPENAI_BASE_URL="http://localhost:11434/v1" +export LLM_MODEL="llama3.1:8b" +python agent.py + +# Any OpenAI-compatible provider (Azure, vLLM, LiteLLM, etc.) +export OPENAI_API_KEY="your-key" #pragma: allowlist secret +export OPENAI_BASE_URL="https://your-endpoint/v1" +export LLM_MODEL="your-model" +python agent.py +``` + +### Demo mode output + +Without an API key, the agent simulates the decision-making process with memory: + +``` +================================================================= + Scene 1: Enterprise customer (C1001) asks about SSO + Customer: C1001 | Query: "How do I set up SSO for my team?" +================================================================= + [Demo mode] Simulating agent reasoning + + Round 1 | recall_memory(customer_id=C1001) + -> No prior interactions found + + Round 1 | lookup_customer(customer_id=C1001) + -> Alice Johnson | enterprise plan | $24,500 spend | 1 open tickets + + Round 1 | search_knowledge_base(query="How do I set up SSO for my team?...") + -> Best match: "Configuring single sign-on (SSO)" + + Round 2 | Generating personalised response... + + Round 2 | save_memory(customer_id=C1001, topic="SSO setup") + -> Memory saved for future conversations + + โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ + Agent Response: + โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ + Hi Alice! + Since you're on our Enterprise plan, SSO is available for your + team. Go to Settings > Security > SSO and enter your Identity + Provider metadata URL. We support SAML 2.0 and OIDC... + +================================================================= + Scene 4: C1001 returns -- does the agent remember Scene 1? + Customer: C1001 | Query: "I'm back about my SSO question from earlier." +================================================================= + [Demo mode] Simulating agent reasoning + + Round 1 | recall_memory(customer_id=C1001) + -> Previous topic: SSO setup + -> Open issue: none + -> Interaction count: 1 + + Round 1 | lookup_customer(customer_id=C1001) + -> Alice Johnson | enterprise plan | $24,500 spend | 1 open tickets + + Round 2 | Generating personalised response... + + Round 2 | save_memory(customer_id=C1001, topic="SSO setup") + -> Memory saved for future conversations + + โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ + Agent Response: + โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ + Welcome back, Alice! I can see from our records that we last + discussed "SSO setup". How can I help you today? +``` + +Scene 4 demonstrates memory continuity -- the agent recalls the SSO conversation from Scene 1 without the customer re-explaining. + +### Live mode output (with API key) + +With an API key, the LLM autonomously decides which tools to use: + +``` +================================================================= + Scene 1: Enterprise customer (C1001) asks about SSO + Customer: C1001 | Query: "How do I set up SSO for my team?" +================================================================= + [Round 1] Tool call: recall_memory({'customer_id': 'C1001'}) + [Round 1] Tool call: lookup_customer({'customer_id': 'C1001'}) + [Round 1] Tool call: search_knowledge_base({'query': 'SSO setup'}) + [Round 2] Tool call: save_memory({'customer_id': 'C1001', 'topic': 'SSO setup', ...}) + Agent finished after 3 round(s) + + โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ + Agent Response: + โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ + Hi Alice! Since you're on our Enterprise plan, SSO is available + for your team. Go to Settings > Security > SSO and enter your + Identity Provider metadata URL. We support SAML 2.0 and OIDC... +``` + +## How It Works + +> **Why a raw loop?** This example builds the agent from scratch using the OpenAI tool-calling API to keep dependencies minimal and make every Feast call visible. In production, you would use a framework like LangChain/LangGraph, CrewAI, or AutoGen -- Feast's MCP endpoint lets any of them auto-discover the tools with zero custom wiring (see [MCP Integration](#mcp-integration) below). + +### The Agent Loop (`agent.py`) + +```python +for round in range(MAX_ROUNDS): + # 1. Send messages + tools to LLM + response = call_llm(messages, tools=[ + lookup_customer, search_knowledge_base, + recall_memory, save_memory, + ]) + + # 2. If LLM says "stop" -> return the answer + if response.finish_reason == "stop": + return response.content + + # 3. Otherwise, execute the tool calls the LLM requested + for tool_call in response.tool_calls: + result = execute_tool(tool_call) # reads/writes Feast + messages.append(tool_result(result)) + + # 4. Loop back โ LLM sees tool results and decides next action +``` + +The LLM sees the tool definitions (JSON Schema) and decides: +- **Which tools to call** (can call zero, one, or multiple per round) +- **What arguments to pass** (e.g., which customer ID to look up) +- **When to write memory** (after resolving, record topic + resolution) +- **When to stop** (once it has enough information to answer) + +### Feature Definitions (`feature_repo/features.py`) + +- **`customer_profile`**: Structured data (name, plan, spend, tickets, satisfaction) +- **`knowledge_base`**: Support articles with 384-dim vector embeddings (Milvus in this demo; swappable to Elasticsearch, Qdrant, PGVector, or FAISS) +- **`agent_memory`**: Per-customer interaction history (last topic, resolution, preferences, open issues) + +### MCP Integration + +The Feast Feature Server exposes all endpoints as MCP tools at `http://localhost:6566/mcp`. +Any MCP-compatible framework can connect: + +```python +# LangChain +from langchain_mcp_adapters.client import MultiServerMCPClient +async with MultiServerMCPClient( + {"feast": {"url": "http://localhost:6566/mcp", "transport": "streamable_http"}} +) as client: + tools = client.get_tools() +``` + +```json +// Claude Desktop / Cursor +{ + "mcpServers": { + "feast": { + "url": "http://localhost:6566/mcp", + "transport": "streamable_http" + } + } +} +``` + +## Production Deployment + +For production, Feast fits into a layered platform architecture: + +```mermaid +%%{init: {'theme': 'base', 'themeVariables': {'primaryColor': '#E3F2FD', 'primaryBorderColor': '#1565C0', 'lineColor': '#546E7A'}}}%% +flowchart TB + Agent(("๐ค AI Agent")):::agentClass + + subgraph Platform ["๐ก๏ธ Production Platform"] + direction TB + Gateway["๐ MCP Gateway\nJWT auth ยท tool filtering"]:::platformClass + Sandbox["๐ฆ Sandboxed Container\nkernel-level isolation"]:::platformClass + Guardrails["๐ Guardrails Orchestrator\ninput/output screening"]:::platformClass + end + + subgraph Observability ["๐ Observability + Lifecycle"] + direction LR + OTel["OpenTelemetry + MLflow\ntraces ยท metrics ยท audit"]:::obsClass + Kagenti["Kagenti Operator\nAgentCard CRDs ยท discovery"]:::lifecycleClass + end + + subgraph FeastSvc ["๐๏ธ Feast MCP Server"] + FS["/mcp ยท /get-online-features ยท /write-to-online-store"]:::feastClass + end + + subgraph Store ["๐๏ธ Online Store"] + direction LR + P[("๐ค Profiles")]:::storageClass + K[("๐ Knowledge Base")]:::storageClass + M[("๐ง Agent Memory")]:::memoryClass + end + + Agent --> Gateway + Gateway --> Sandbox + Sandbox --> Guardrails + Guardrails --> FS + FS --> P + FS --> K + FS <--> M + OTel -.->|"traces"| FS + Kagenti -.->|"discover"| FS + + classDef agentClass fill:#E3F2FD,stroke:#1565C0,color:#0D47A1 + classDef platformClass fill:#FFEBEE,stroke:#C62828,color:#B71C1C + classDef obsClass fill:#FFF8E1,stroke:#F57F17,color:#E65100 + classDef lifecycleClass fill:#E0F2F1,stroke:#00695C,color:#004D40 + classDef feastClass fill:#FFF3E0,stroke:#E65100,color:#BF360C + classDef storageClass fill:#E8F5E9,stroke:#2E7D32,color:#1B5E20 + classDef memoryClass fill:#F3E5F5,stroke:#6A1B9A,color:#4A148C +``` + +This demo uses Milvus Lite (embedded). For production, swap to any supported vector-capable backend by updating `feature_store.yaml`: + +- **Milvus cluster**: Deploy via the [Milvus Operator](https://milvus.io/docs/install_cluster-milvusoperator.md) and set `host`/`port` instead of `path`. +- **Elasticsearch**: Set `online_store: type: elasticsearch` with your cluster URL. +- **Qdrant**: Set `online_store: type: qdrant` with your Qdrant endpoint. +- **PGVector**: Set `online_store: type: postgres` with `pgvector_enabled: true`. +- **FAISS**: Set `online_store: type: faiss` for in-process vector search. diff --git a/examples/agent_feature_store/agent.py b/examples/agent_feature_store/agent.py new file mode 100644 index 00000000000..32e15f781f3 --- /dev/null +++ b/examples/agent_feature_store/agent.py @@ -0,0 +1,795 @@ +""" +Customer-support AI agent powered by Feast features and memory via MCP. + +The LLM decides which tools to call, when to call them, and what to do +with the results. Feast acts as both the **context provider** (read) and +the **memory store** (write). + +The agent has four Feast-backed tools: + + READ tools: + - lookup_customer: Retrieve customer profile features. + - search_knowledge_base: Retrieve support articles. + - recall_memory: Read past interaction context for this customer. + + WRITE tool: + - save_memory: Persist notes about this interaction back to Feast. + +Memory is entity-keyed (per customer), TTL-managed, versioned, and governed +by the same RBAC as every other feature -- unlike an ad-hoc Redis cache or +an in-process dict. + +Prerequisites: + 1. Run `python setup_data.py` to populate sample data. + 2. Start the Feast server: `cd feature_repo && feast serve --host 0.0.0.0 --port 6566 --workers 1` + 3. Set OPENAI_API_KEY (required for tool-calling). + +Usage: + python agent.py +""" + +import os +import sys +from typing import Any + +import requests + +FEAST_SERVER = os.getenv("FEAST_SERVER_URL", "http://localhost:6566") +OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "") +OPENAI_BASE_URL = os.getenv("OPENAI_BASE_URL", "https://api.openai.com/v1") +LLM_MODEL = os.getenv("LLM_MODEL", "gpt-4o-mini") +MAX_TOOL_ROUNDS = 5 + +# --------------------------------------------------------------------------- +# Tools: each wraps a Feast REST call +# --------------------------------------------------------------------------- + +TOOLS_SPEC = [ + { + "type": "function", + "function": { + "name": "lookup_customer", + "description": ( + "Look up a customer's profile from the feature store. Returns " + "name, email, plan tier, account age, total spend, open support " + "tickets, and satisfaction score." + ), + "parameters": { + "type": "object", + "properties": { + "customer_id": { + "type": "string", + "description": "The customer ID, e.g. 'C1001'", + } + }, + "required": ["customer_id"], + }, + }, + }, + { + "type": "function", + "function": { + "name": "search_knowledge_base", + "description": ( + "Search the support knowledge base for articles relevant to " + "the user's question. Returns article titles, content, and " + "categories. Use this when you need product documentation or " + "how-to information." + ), + "parameters": { + "type": "object", + "properties": { + "query": { + "type": "string", + "description": "The search query describing what the user needs help with", + } + }, + "required": ["query"], + }, + }, + }, + { + "type": "function", + "function": { + "name": "recall_memory", + "description": ( + "Recall past interaction context for a customer from the memory " + "store. Returns the last topic discussed, how it was resolved, " + "interaction count, stated preferences, and any open issue. " + "Call this at the start of a conversation to personalise your " + "response based on history." + ), + "parameters": { + "type": "object", + "properties": { + "customer_id": { + "type": "string", + "description": "The customer ID to recall memory for", + } + }, + "required": ["customer_id"], + }, + }, + }, + { + "type": "function", + "function": { + "name": "save_memory", + "description": ( + "Save a note about this interaction to the memory store so " + "future conversations can reference it. Use this after resolving " + "a question to record what was discussed and any commitments made." + ), + "parameters": { + "type": "object", + "properties": { + "customer_id": { + "type": "string", + "description": "The customer ID", + }, + "topic": { + "type": "string", + "description": "Brief label for the topic discussed, e.g. 'SSO setup'", + }, + "resolution": { + "type": "string", + "description": "How the issue was resolved or what was communicated", + }, + "open_issue": { + "type": "string", + "description": "Any unresolved follow-up, or empty string if fully resolved", + }, + "preferences": { + "type": "string", + "description": "Any stated customer preferences to remember", + }, + }, + "required": ["customer_id", "topic", "resolution"], + }, + }, + }, +] + + +def tool_lookup_customer(customer_id: str) -> dict[str, Any]: + """Fetch customer profile features from Feast.""" + payload = { + "features": [ + "customer_profile:name", + "customer_profile:email", + "customer_profile:plan_tier", + "customer_profile:account_age_days", + "customer_profile:total_spend", + "customer_profile:open_tickets", + "customer_profile:satisfaction_score", + ], + "entities": {"customer_id": [customer_id]}, + } + resp = requests.post(f"{FEAST_SERVER}/get-online-features", json=payload) + resp.raise_for_status() + data = resp.json() + results = data.get("results", []) + feature_names = data.get("metadata", {}).get("feature_names", []) + + features = {} + for i, name in enumerate(feature_names): + values = results[i].get("values", []) + features[name] = values[0] if values else None + return features + + +def tool_search_knowledge_base(query: str) -> list[dict[str, Any]]: + """Search knowledge-base articles from Feast using keyword similarity.""" + payload = { + "features": [ + "knowledge_base:title", + "knowledge_base:content", + "knowledge_base:category", + ], + "query_string": query, + "top_k": 3, + "api_version": 2, + } + resp = requests.post(f"{FEAST_SERVER}/retrieve-online-documents", json=payload) + resp.raise_for_status() + data = resp.json() + results = data.get("results", []) + feature_names = data.get("metadata", {}).get("feature_names", []) + + num_docs = len(results[0]["values"]) if results else 0 + docs = [] + for doc_idx in range(num_docs): + doc = {} + for feat_idx, name in enumerate(feature_names): + doc[name] = results[feat_idx]["values"][doc_idx] + if doc.get("title"): + docs.append(doc) + return docs + + +def tool_recall_memory(customer_id: str) -> dict[str, Any]: + """Read the agent's memory for a customer from Feast.""" + payload = { + "features": [ + "agent_memory:last_topic", + "agent_memory:last_resolution", + "agent_memory:interaction_count", + "agent_memory:preferences", + "agent_memory:open_issue", + ], + "entities": {"customer_id": [customer_id]}, + } + resp = requests.post(f"{FEAST_SERVER}/get-online-features", json=payload) + resp.raise_for_status() + data = resp.json() + results = data.get("results", []) + feature_names = data.get("metadata", {}).get("feature_names", []) + + memory = {} + for i, name in enumerate(feature_names): + values = results[i].get("values", []) + memory[name] = values[0] if values else None + + has_memory = any(v is not None for k, v in memory.items() if k != "customer_id") + if not has_memory: + return {"status": "no_previous_interactions", "customer_id": customer_id} + return memory + + +def tool_save_memory( + customer_id: str, + topic: str, + resolution: str, + open_issue: str = "", + preferences: str = "", +) -> dict[str, str]: + """Write interaction memory back to Feast via the REST API.""" + from datetime import datetime, timezone + + existing = tool_recall_memory(customer_id) + prev_count = existing.get("interaction_count") or 0 + + now = datetime.now(timezone.utc).isoformat() + payload = { + "feature_view_name": "agent_memory", + "df": { + "customer_id": [customer_id], + "last_topic": [topic], + "last_resolution": [resolution], + "interaction_count": [prev_count + 1], + "preferences": [preferences], + "open_issue": [open_issue], + "event_timestamp": [now], + }, + "allow_registry_cache": True, + } + resp = requests.post(f"{FEAST_SERVER}/write-to-online-store", json=payload) + resp.raise_for_status() + return {"status": "saved", "customer_id": customer_id, "topic": topic} + + +TOOL_REGISTRY = { + "lookup_customer": lambda args: tool_lookup_customer(args["customer_id"]), + "search_knowledge_base": lambda args: tool_search_knowledge_base(args["query"]), + "recall_memory": lambda args: tool_recall_memory(args["customer_id"]), + "save_memory": lambda args: tool_save_memory( + customer_id=args["customer_id"], + topic=args["topic"], + resolution=args["resolution"], + open_issue=args.get("open_issue", ""), + preferences=args.get("preferences", ""), + ), +} + + +# --------------------------------------------------------------------------- +# Agent loop +# --------------------------------------------------------------------------- + +SYSTEM_PROMPT = """\ +You are a customer-support agent. Follow these steps EXACTLY: + +Step 1: Call recall_memory and lookup_customer in your FIRST round (call both at once). +Step 2: If needed, call search_knowledge_base. +Step 3: Write your answer to the customer. Personalise it using their name and plan tier. +Step 4: Call save_memory with a short topic and resolution. + +IMPORTANT: +- Never call the same tool twice with the same arguments. +- Call at most 2-3 tools per round. +- After you have the tool results, WRITE your answer. Do not call more tools. +- Tailor your answer to the customer's plan: enterprise customers get full access, + pro/starter customers may need to upgrade for certain features. +""" + + +def _call_llm(messages: list, use_tools: bool = True) -> dict: + """Make a single LLM API call, returning the parsed choice dict.""" + url = f"{OPENAI_BASE_URL}/chat/completions" + payload: dict[str, Any] = { + "model": LLM_MODEL, + "messages": messages, + "temperature": 0.3, + } + if use_tools: + payload["tools"] = TOOLS_SPEC + resp = requests.post( + url, + headers={ + "Authorization": f"Bearer {OPENAI_API_KEY}", + "Content-Type": "application/json", + }, + json=payload, + ) + if not resp.ok: + print(f"\n ERROR {resp.status_code} from {url}") + print(f" Response: {resp.text[:300]}") + print( + "\n Hint: set OPENAI_BASE_URL to an OpenAI-compatible endpoint.\n" + " Examples:\n" + " OpenAI: export OPENAI_BASE_URL=https://api.openai.com/v1\n" + " Ollama: export OPENAI_BASE_URL=http://localhost:11434/v1\n" + ) + resp.raise_for_status() + return resp.json()["choices"][0] + + +def run_agent(customer_id: str, user_message: str) -> str: + """ + Agentic loop: the LLM decides which tools to call (if any), executes + them, feeds results back, and repeats until it produces a final answer. + Memory is saved as a fallback only if the LLM didn't call save_memory. + """ + if not OPENAI_API_KEY: + return _run_agent_demo_mode(customer_id, user_message) + + response, memory_saved = _run_agent_llm(customer_id, user_message) + + if not memory_saved: + _ensure_memory_saved(customer_id, user_message, response) + + return response + + +def _ensure_memory_saved(customer_id: str, user_message: str, response: str) -> None: + """Fallback: save memory only when the LLM didn't call save_memory itself.""" + topic = _extract_topic(user_message) + resolution = response[:120] if response else "Answered query" + try: + tool_save_memory( + customer_id=customer_id, + topic=topic, + resolution=resolution, + ) + print(f' [Auto] Memory saved: topic="{topic}"') + except Exception as e: + print(f" [Auto] Failed to save memory: {e}") + + +def _run_agent_llm(customer_id: str, user_message: str) -> tuple[str, bool]: + """Core LLM agent loop. Returns (response_text, memory_was_saved).""" + import json + + messages = [ + {"role": "system", "content": SYSTEM_PROMPT}, + { + "role": "user", + "content": f"[Customer ID: {customer_id}]\n\n{user_message}", + }, + ] + + seen_calls: set[str] = set() + collected_context: dict[str, Any] = {} + memory_saved = False + + for round_num in range(1, MAX_TOOL_ROUNDS + 1): + choice = _call_llm(messages) + assistant_msg = choice["message"] + messages.append(assistant_msg) + + content = assistant_msg.get("content") or "" + if choice["finish_reason"] == "stop": + if content: + print(f" โ Agent finished after {round_num} round(s)") + return content, memory_saved + break # empty stop -- fall through to forced response + + tool_calls = assistant_msg.get("tool_calls", []) + if not tool_calls: + if content: + return content, memory_saved + break + + tool_names = [tc["function"]["name"] for tc in tool_calls] + print(f" ๐ง Round {round_num}: LLM chose tool(s): {', '.join(tool_names)}") + + for tc in tool_calls: + fn_name = tc["function"]["name"] + fn_args = json.loads(tc["function"]["arguments"]) + call_key = f"{fn_name}:{json.dumps(fn_args, sort_keys=True)}" + + if call_key in seen_calls: + print(f" [Round {round_num}] Skipping duplicate: {fn_name}({fn_args})") + messages.append( + { + "role": "tool", + "tool_call_id": tc["id"], + "content": json.dumps( + { + "note": "Already called. Use the results you have and respond." + } + ), + } + ) + continue + + seen_calls.add(call_key) + print(f" [Round {round_num}] โ {fn_name}({fn_args})") + + handler = TOOL_REGISTRY.get(fn_name) + if handler: + result = handler(fn_args) + result_str = json.dumps(result, default=str) + collected_context[fn_name] = result + if fn_name == "save_memory": + memory_saved = True + else: + result_str = json.dumps({"error": f"Unknown tool: {fn_name}"}) + + messages.append( + { + "role": "tool", + "tool_call_id": tc["id"], + "content": result_str, + } + ) + + # If the LLM never produced a response, force one last call without tools + print(" โณ Forcing final response...") + messages.append( + { + "role": "user", + "content": "You have all the information. Write your answer to the customer now.", + } + ) + choice = _call_llm(messages, use_tools=False) + content = choice["message"].get("content") or "" + if content: + return content, memory_saved + + # Last resort: build a response from collected tool results + return _fallback_response( + collected_context, customer_id, user_message + ), memory_saved + + +def _fallback_response(context: dict, customer_id: str, user_message: str) -> str: + """Build a basic response from collected tool results when the LLM fails.""" + profile = context.get("lookup_customer", {}) + name = profile.get("name", customer_id) + plan = profile.get("plan_tier", "your") + memory = context.get("recall_memory", {}) + + parts = [] + if memory and memory.get("last_topic"): + parts.append( + f'Welcome back, {name}! I see we last discussed "{memory["last_topic"]}".' + ) + else: + parts.append(f"Hi {name}!") + + parts.append( + f"You're on the {plan} plan. I've noted your question about " + f'"{user_message[:50]}" and will follow up.' + ) + return " ".join(parts) + + +def _run_agent_demo_mode(customer_id: str, user_message: str) -> str: + """ + Demo mode: simulates the agentic tool-calling flow and generates a + personalised response that shows how Feast context shapes the answer. + """ + print(" [Demo mode] Simulating agent reasoning\n") + + # โโ Round 1: recall memory โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ + print(f" Round 1 | recall_memory(customer_id={customer_id})") + memory = tool_recall_memory(customer_id) + has_memory = memory.get("status") != "no_previous_interactions" + if has_memory: + print(f" -> Previous topic: {memory.get('last_topic')}") + print(f" -> Open issue: {memory.get('open_issue') or 'none'}") + print(f" -> Interaction count: {memory.get('interaction_count')}") + else: + print(" -> No prior interactions found") + + # โโ Round 1: lookup customer โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ + print(f"\n Round 1 | lookup_customer(customer_id={customer_id})") + profile = tool_lookup_customer(customer_id) + name = profile.get("name", "Customer") + plan = profile.get("plan_tier", "unknown") + spend = profile.get("total_spend", 0) + tickets = profile.get("open_tickets", 0) + print( + f" -> {name} | {plan} plan | ${spend:,.0f} spend | {tickets} open tickets" + ) + + # โโ Round 1: search knowledge base (if question needs docs) โโโโโโโโโ + needs_kb = any( + kw in user_message.lower() + for kw in [ + "how", + "what", + "set up", + "configure", + "reset", + "help", + "sso", + "api", + "invoice", + "upgrade", + "password", + ] + ) + kb_article = None + if needs_kb: + print(f'\n Round 1 | search_knowledge_base(query="{user_message[:50]}...")') + docs = tool_search_knowledge_base(user_message) + if docs: + kb_article = _pick_best_article(user_message, docs) + print(f' -> Best match: "{kb_article.get("title")}"') + + # โโ Round 2: generate response (simulated LLM reasoning) โโโโโโโโโโโโ + print("\n Round 2 | Generating personalised response...") + response = _build_demo_response( + name=name, + plan=plan, + profile=profile, + memory=memory if has_memory else None, + kb_article=kb_article, + user_message=user_message, + ) + + # โโ Round 2: save memory โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ + topic = _extract_topic(user_message) + resolution = f"Answered based on {plan} plan context" + if kb_article: + resolution += f" and KB article '{kb_article.get('title')}'" + print(f'\n Round 2 | save_memory(customer_id={customer_id}, topic="{topic}")') + tool_save_memory( + customer_id=customer_id, + topic=topic, + resolution=resolution, + ) + print(" -> Memory saved for future conversations") + + return response + + +def _pick_best_article(query: str, docs: list) -> dict: + """Simple keyword matching to select the most relevant article.""" + query_lower = query.lower() + keywords_to_category = { + "sso": "Configuring single sign-on", + "single sign": "Configuring single sign-on", + "password": "How to reset your password", # pragma: allowlist secret + "reset": "How to reset your password", + "invoice": "Understanding your invoice", + "billing": "Understanding your invoice", + "upgrade": "Upgrading your subscription", + "plan": "Upgrading your subscription", + "api": "Setting up API access", + "rate limit": "Setting up API access", + "support": "Contacting support", + "contact": "Contacting support", + } + for keyword, title_prefix in keywords_to_category.items(): + if keyword in query_lower: + for doc in docs: + if doc.get("title", "").startswith(title_prefix): + return doc + return docs[0] + + +def _extract_topic(message: str) -> str: + """Extract a short topic label from the user message.""" + topic_map = { + "sso": "SSO setup", + "invoice": "Invoice help", + "upgrade": "Plan upgrade", + "api": "API access", + "password": "Password reset", # pragma: allowlist secret + "reset": "Password reset", + } + lower = message.lower() + for keyword, topic in topic_map.items(): + if keyword in lower: + return topic + return message[:40] + + +def _build_demo_response( + name: str, + plan: str, + profile: dict, + memory: dict | None, + kb_article: dict | None, + user_message: str, +) -> str: + """Build a realistic personalised response based on Feast context.""" + parts = [] + + # Acknowledge returning customer if we have memory + if memory and memory.get("last_topic"): + parts.append( + f"Welcome back, {name}! I can see from our records that we last " + f'discussed "{memory["last_topic"]}".' + ) + if memory.get("open_issue"): + parts.append( + f"I also notice you have an open issue: {memory['open_issue']}. " + "Let me know if you'd like to follow up on that." + ) + else: + parts.append(f"Hi {name}!") + + # Role-based response logic + lower = user_message.lower() + + if "sso" in lower: + if plan == "enterprise": + parts.append( + "Since you're on our Enterprise plan, SSO is available for your " + "team. Go to Settings > Security > SSO and enter your Identity " + "Provider metadata URL. We support SAML 2.0 and OIDC. Once " + "configured, all team members will authenticate through your IdP." + ) + parts.append( + "As an Enterprise customer, you also have a dedicated Slack " + "channel and account manager if you need hands-on help." + ) + elif plan == "pro": + parts.append( + "SSO is only available on our Enterprise plan. You're currently " + "on the Pro plan. Would you like to learn about upgrading? The " + "Enterprise plan includes SSO, priority support, and a dedicated " + "account manager." + ) + else: + parts.append( + "SSO is an Enterprise-only feature. You're currently on the " + f"Starter plan (${profile.get('total_spend', 0):,.0f} total spend). " + "You'd need to upgrade to Enterprise to access SSO. I can walk " + "you through the upgrade options if you're interested." + ) + + elif "invoice" in lower: + parts.append( + "Invoices are generated on the first of each month and sent to " + f"{profile.get('email', 'your billing email')}." + ) + if plan == "enterprise": + parts.append( + "As an Enterprise customer, your invoice includes base plan " + "charges, any overage fees, and applied credits. You can also " + "reach your dedicated account manager for a detailed breakdown." + ) + else: + parts.append( + "You can download past invoices from Billing > Invoices. Each " + "invoice shows base charges and any overages." + ) + if profile.get("open_tickets", 0) > 0: + parts.append( + f"I also see you have {profile['open_tickets']} open support " + "ticket(s) -- let me know if any are billing-related." + ) + + elif "upgrade" in lower or "api" in lower: + if plan == "starter": + parts.append( + "Great question! You're on the Starter plan. Upgrading to Pro " + "gives you API access with 1,000 requests/minute. Enterprise " + "gets you 5,000 req/min plus priority support. The price " + "difference is prorated for your current billing cycle." + ) + elif plan == "pro": + parts.append( + "You're on the Pro plan with 1,000 API requests/minute. " + "Upgrading to Enterprise would give you 5,000 req/min, SSO, " + f"and a dedicated account manager. Given your ${profile.get('total_spend', 0):,.0f} " + "total spend, I can check if there are any loyalty discounts available." + ) + else: + parts.append( + "You're already on our Enterprise plan with the highest rate " + "limits (5,000 req/min). If you need even higher throughput, " + "I can connect you with your account manager to discuss custom limits." + ) + + elif memory and memory.get("last_topic"): + parts.append( + f"Yes, I have the full context from our previous conversation about " + f'"{memory["last_topic"]}". ' + f"We've now had {memory.get('interaction_count', 1)} interaction(s). " + "How can I help you today?" + ) + + else: + parts.append("How can I help you today?") + + return "\n".join(parts) + + +# --------------------------------------------------------------------------- +# Demo queries +# --------------------------------------------------------------------------- + +DEMO_QUERIES = [ + # Scene 1: Enterprise customer asks about SSO -- should get full access instructions + ("C1001", "How do I set up SSO for my team?"), + # Scene 2: Starter customer asks the SAME question -- should be told it's Enterprise-only + ("C1003", "How do I set up SSO for my team?"), + # Scene 3: Pro customer asks about invoices -- response uses their email/ticket context + ("C1002", "I need help understanding my last invoice."), + # Scene 4: C1001 returns -- agent should recall the SSO conversation from Scene 1 + ("C1001", "I'm back about my SSO question from earlier."), +] + + +def main(): + print("=" * 65) + print(" Feast-Powered AI Agent Demo: Context + Memory") + print("=" * 65) + print() + print(" This demo shows two key capabilities:") + print(" 1. ROLE-BASED RESPONSES: Same question, different answer per plan tier") + print(" 2. PERSISTENT MEMORY: Agent recalls prior conversations via Feast") + print() + print( + " Tools: recall_memory | lookup_customer | search_knowledge_base | save_memory" + ) + print() + + try: + resp = requests.get(f"{FEAST_SERVER}/health") + resp.raise_for_status() + print(f"Feast server: healthy at {FEAST_SERVER}") + except (requests.ConnectionError, requests.HTTPError) as exc: + print(f"ERROR: Cannot reach Feast server at {FEAST_SERVER} ({exc})") + print( + "Start it with: cd feature_repo && feast serve --host 0.0.0.0 --port 6566 --workers 1" + ) + sys.exit(1) + + if not OPENAI_API_KEY: + print("OPENAI_API_KEY not set -- running in demo mode (simulated reasoning)\n") + else: + print(f"Using LLM: {LLM_MODEL} via {OPENAI_BASE_URL}\n") + + scene_labels = [ + "Scene 1: Enterprise customer (C1001) asks about SSO", + "Scene 2: Starter customer (C1003) asks the SAME SSO question", + "Scene 3: Pro customer (C1002) asks about invoices", + "Scene 4: C1001 returns -- does the agent remember Scene 1?", + ] + + for i, (customer_id, query) in enumerate(DEMO_QUERIES): + label = scene_labels[i] if i < len(scene_labels) else "" + print(f"\n{'=' * 65}") + print(f" {label}") + print(f' Customer: {customer_id} | Query: "{query}"') + print(f"{'=' * 65}") + + response = run_agent(customer_id, query) + + print(f"\n {'โ' * 61}") + print(" Agent Response:") + print(f" {'โ' * 61}") + for line in response.split("\n"): + print(f" {line}") + print() + + +if __name__ == "__main__": + main() diff --git a/examples/agent_feature_store/feature_repo/feature_store.yaml b/examples/agent_feature_store/feature_repo/feature_store.yaml new file mode 100644 index 00000000000..7cc89d536c8 --- /dev/null +++ b/examples/agent_feature_store/feature_repo/feature_store.yaml @@ -0,0 +1,30 @@ +project: feast_agent +provider: local +registry: data/registry.db + +online_store: + type: milvus + path: data/online_store.db + vector_enabled: true + embedding_dim: 384 + index_type: "IVF_FLAT" + metric_type: "COSINE" + nlist: 128 + +offline_store: + type: file + +entity_key_serialization_version: 3 + +feature_server: + type: mcp + enabled: true + mcp_enabled: true + mcp_transport: http + mcp_server_name: "feast-agent-demo" + mcp_server_version: "1.0.0" + feature_logging: + enabled: false + +auth: + type: no_auth diff --git a/examples/agent_feature_store/feature_repo/features.py b/examples/agent_feature_store/feature_repo/features.py new file mode 100644 index 00000000000..935b9cb03c0 --- /dev/null +++ b/examples/agent_feature_store/feature_repo/features.py @@ -0,0 +1,83 @@ +from datetime import timedelta + +from feast import Entity, FeatureView, Field, FileSource +from feast.data_format import ParquetFormat +from feast.types import Array, Float32, Float64, Int64, String, ValueType + +customer = Entity( + name="customer_id", + description="Unique customer identifier", + value_type=ValueType.STRING, +) + +document = Entity( + name="doc_id", + description="Knowledge-base document chunk identifier", + value_type=ValueType.INT64, +) + +customer_profile_source = FileSource( + file_format=ParquetFormat(), + path="data/customer_profiles.parquet", + timestamp_field="event_timestamp", +) + +knowledge_base_source = FileSource( + file_format=ParquetFormat(), + path="data/knowledge_base.parquet", + timestamp_field="event_timestamp", +) + +agent_memory_source = FileSource( + file_format=ParquetFormat(), + path="data/agent_memory.parquet", + timestamp_field="event_timestamp", +) + +customer_profile = FeatureView( + name="customer_profile", + entities=[customer], + schema=[ + Field(name="name", dtype=String), + Field(name="email", dtype=String), + Field(name="plan_tier", dtype=String), + Field(name="account_age_days", dtype=Int64), + Field(name="total_spend", dtype=Float64), + Field(name="open_tickets", dtype=Int64), + Field(name="satisfaction_score", dtype=Float64), + ], + source=customer_profile_source, + ttl=timedelta(days=1), +) + +knowledge_base = FeatureView( + name="knowledge_base", + entities=[document], + schema=[ + Field( + name="vector", + dtype=Array(Float32), + vector_index=True, + vector_search_metric="COSINE", + ), + Field(name="title", dtype=String), + Field(name="content", dtype=String), + Field(name="category", dtype=String), + ], + source=knowledge_base_source, + ttl=timedelta(days=7), +) + +agent_memory = FeatureView( + name="agent_memory", + entities=[customer], + schema=[ + Field(name="last_topic", dtype=String), + Field(name="last_resolution", dtype=String), + Field(name="interaction_count", dtype=Int64), + Field(name="preferences", dtype=String), + Field(name="open_issue", dtype=String), + ], + source=agent_memory_source, + ttl=timedelta(days=30), +) diff --git a/examples/agent_feature_store/run_demo.sh b/examples/agent_feature_store/run_demo.sh new file mode 100755 index 00000000000..487ed4a7c6e --- /dev/null +++ b/examples/agent_feature_store/run_demo.sh @@ -0,0 +1,82 @@ +#!/usr/bin/env bash +# +# One-command setup and demo for the Feast-powered AI agent example. +# +# Usage: +# cd examples/agent_feature_store +# ./run_demo.sh # demo mode (no API key needed) +# OPENAI_API_KEY=sk-... ./run_demo.sh # live LLM tool-calling +# + +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +cd "$SCRIPT_DIR" + +PYTHON="${PYTHON:-$(command -v python3 || command -v python || true)}" +if [[ -z "$PYTHON" ]]; then + echo "ERROR: python3 or python not found on PATH." + exit 1 +fi +PIP="${PIP:-$(command -v pip3 || command -v pip || true)}" +if [[ -z "$PIP" ]]; then + echo "ERROR: pip3 or pip not found on PATH." + exit 1 +fi + +SERVER_PORT=6566 +SERVER_PID="" + +cleanup() { + if [[ -n "$SERVER_PID" ]]; then + echo "" + echo "Stopping Feast server (pid $SERVER_PID)..." + kill "$SERVER_PID" 2>/dev/null || true + wait "$SERVER_PID" 2>/dev/null || true + fi +} +trap cleanup EXIT + +# โโ 1. Install dependencies โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ +echo "==> Step 1/4: Installing dependencies..." +$PIP install -q "feast[mcp,milvus]" + +# โโ 2. Generate data and apply registry โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ +echo "" +echo "==> Step 2/4: Generating sample data and applying Feast registry..." +$PYTHON setup_data.py + +# โโ 3. Start the Feast MCP server in the background โโโโโโโโโโโโโโโโโโโโโโโโโ +echo "" +echo "==> Step 3/4: Starting Feast MCP feature server on port $SERVER_PORT..." +cd feature_repo +feast serve --host 0.0.0.0 --port "$SERVER_PORT" --workers 1 & +SERVER_PID=$! +cd "$SCRIPT_DIR" + +echo " Waiting for server to become healthy..." +for i in $(seq 1 30); do + if curl -sf "http://localhost:${SERVER_PORT}/health" > /dev/null 2>&1; then + echo " Server is ready." + break + fi + if ! kill -0 "$SERVER_PID" 2>/dev/null; then + echo "ERROR: Server process exited unexpectedly." + exit 1 + fi + sleep 1 +done + +if ! curl -sf "http://localhost:${SERVER_PORT}/health" > /dev/null 2>&1; then + echo "ERROR: Server did not become healthy within 30 seconds." + exit 1 +fi + +# โโ 4. Run the agent โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ +echo "" +echo "==> Step 4/4: Running the agent..." +echo "" +$PYTHON agent.py + +echo "" +echo "Demo complete." diff --git a/examples/agent_feature_store/setup_data.py b/examples/agent_feature_store/setup_data.py new file mode 100644 index 00000000000..cd88e008af4 --- /dev/null +++ b/examples/agent_feature_store/setup_data.py @@ -0,0 +1,201 @@ +""" +Generates sample data, applies the Feast registry, and materializes features +into the online store so the agent demo is ready to run. + +Usage: + cd examples/agent_feature_store + python setup_data.py +""" + +import os +import sys + +import numpy as np +import pandas as pd + +REPO_DIR = os.path.join(os.path.dirname(__file__), "feature_repo") +DATA_DIR = os.path.join(REPO_DIR, "data") +os.makedirs(DATA_DIR, exist_ok=True) + +EMBEDDING_DIM = 384 +NOW = pd.Timestamp.now() + + +def generate_customer_profiles() -> pd.DataFrame: + customers = [ + { + "customer_id": "C1001", + "name": "Alice Johnson", + "email": "alice@example.com", + "plan_tier": "enterprise", + "account_age_days": 730, + "total_spend": 24500.00, + "open_tickets": 1, + "satisfaction_score": 4.5, + }, + { + "customer_id": "C1002", + "name": "Bob Smith", + "email": "bob@example.com", + "plan_tier": "pro", + "account_age_days": 365, + "total_spend": 8400.00, + "open_tickets": 3, + "satisfaction_score": 3.2, + }, + { + "customer_id": "C1003", + "name": "Carol Lee", + "email": "carol@example.com", + "plan_tier": "starter", + "account_age_days": 90, + "total_spend": 990.00, + "open_tickets": 0, + "satisfaction_score": 4.8, + }, + ] + df = pd.DataFrame(customers) + df["event_timestamp"] = NOW + return df + + +def generate_knowledge_base() -> pd.DataFrame: + articles = [ + { + "doc_id": 1, + "title": "How to reset your password", + "content": ( + "To reset your password, go to Settings > Security > Reset Password. " + "Enter your current password, then choose a new one that is at least " + "12 characters long. Click Save. If you forgot your current password, " + "click 'Forgot Password' on the login page to receive a reset link " + "via email." + ), + "category": "account", + }, + { + "doc_id": 2, + "title": "Upgrading your subscription plan", + "content": ( + "You can upgrade your plan from Starter to Pro or Enterprise at any " + "time. Navigate to Billing > Plans and select the plan you want. " + "The price difference is prorated for the current billing cycle. " + "Enterprise plans include priority support, custom integrations, " + "and a dedicated account manager." + ), + "category": "billing", + }, + { + "doc_id": 3, + "title": "Setting up API access", + "content": ( + "To generate an API key, go to Settings > Developer > API Keys and " + "click 'Create New Key'. Choose the appropriate scopes for your use " + "case. API keys are tied to your account and inherit your permissions. " + "Rate limits are 1000 requests/minute for Pro and 5000 for Enterprise." + ), + "category": "developer", + }, + { + "doc_id": 4, + "title": "Understanding your invoice", + "content": ( + "Invoices are generated on the first of each month and sent to the " + "billing email on file. Each invoice includes a breakdown of base plan " + "charges, overage fees, and any credits applied. You can download past " + "invoices from Billing > Invoices." + ), + "category": "billing", + }, + { + "doc_id": 5, + "title": "Configuring single sign-on (SSO)", + "content": ( + "SSO is available on Enterprise plans. To configure SSO, go to " + "Settings > Security > SSO and provide your Identity Provider (IdP) " + "metadata URL. We support SAML 2.0 and OIDC. Once configured, all " + "team members will authenticate through your IdP." + ), + "category": "account", + }, + { + "doc_id": 6, + "title": "Contacting support", + "content": ( + "You can reach our support team via the in-app chat widget, by " + "emailing support@example.com, or by opening a ticket at " + "https://support.example.com. Enterprise customers have access to " + "a dedicated Slack channel and a named account manager with a " + "guaranteed 1-hour response time." + ), + "category": "support", + }, + ] + + np.random.seed(42) + df = pd.DataFrame(articles) + df["vector"] = [ + np.random.randn(EMBEDDING_DIM).astype(np.float32).tolist() + for _ in range(len(df)) + ] + df["event_timestamp"] = NOW + return df + + +def main(): + print("Generating customer profile data...") + customers_df = generate_customer_profiles() + customers_path = os.path.join(DATA_DIR, "customer_profiles.parquet") + customers_df.to_parquet(customers_path, index=False) + print(f" Saved {len(customers_df)} customer profiles to {customers_path}") + + print("Generating knowledge-base data...") + kb_df = generate_knowledge_base() + kb_path = os.path.join(DATA_DIR, "knowledge_base.parquet") + kb_df.to_parquet(kb_path, index=False) + print(f" Saved {len(kb_df)} knowledge-base articles to {kb_path}") + + print("Generating empty agent memory scaffold...") + memory_df = pd.DataFrame( + { + "customer_id": pd.Series(dtype="str"), + "last_topic": pd.Series(dtype="str"), + "last_resolution": pd.Series(dtype="str"), + "interaction_count": pd.Series(dtype="int64"), + "preferences": pd.Series(dtype="str"), + "open_issue": pd.Series(dtype="str"), + "event_timestamp": pd.Series(dtype="datetime64[ns]"), + } + ) + memory_path = os.path.join(DATA_DIR, "agent_memory.parquet") + memory_df.to_parquet(memory_path, index=False) + print(f" Saved empty memory scaffold to {memory_path}") + + print("Applying Feast registry...") + sys.path.insert(0, REPO_DIR) + from feast import FeatureStore + from features import ( + agent_memory, + customer, + customer_profile, + document, + knowledge_base, + ) + + store = FeatureStore(repo_path=REPO_DIR) + store.apply([customer, document, customer_profile, knowledge_base, agent_memory]) + + print("Materializing customer profiles to the online store...") + store.write_to_online_store(feature_view_name="customer_profile", df=customers_df) + print(" Done.") + + print("Materializing knowledge-base to the online store...") + store.write_to_online_store(feature_view_name="knowledge_base", df=kb_df) + print(" Done.") + + print("\nSetup complete! Start the feature server with:") + print(" cd feature_repo && feast serve --host 0.0.0.0 --port 6566") + + +if __name__ == "__main__": + main() diff --git a/infra/website/docs/blog/feast-agents-mcp.md b/infra/website/docs/blog/feast-agents-mcp.md new file mode 100644 index 00000000000..42abd5299b3 --- /dev/null +++ b/infra/website/docs/blog/feast-agents-mcp.md @@ -0,0 +1,312 @@ +--- +title: "Building AI Agents with Feast: Feature Stores as Context and Memory" +description: "How Feast's MCP integration turns your feature store into a governed context and memory layer for AI agents, bridging the gap between experimental agents and production-ready systems." +date: 2026-04-09 +authors: ["Nikhil Kathole"] +--- + +
+
+
+