Skip to content

Commit 046f795

Browse files
committed
[migrations] Spark-to-Feldera migration tool PoC.
CLI tool using LLM to translate and syntactically validate Spark SQL programs to Feldera SQL. Signed-off-by: Wilma <wilmaontherun@gmail.com>
1 parent fb38716 commit 046f795

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

71 files changed

+8346
-0
lines changed

python/felderize/README.md

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
# Felderize — Spark SQL to Feldera SQL Translator
2+
3+
felderize translates Spark SQL schemas and queries into valid [Feldera](https://www.feldera.com/) SQL using LLM-based translation with optional compiler validation.
4+
5+
## Setup
6+
7+
```bash
8+
cd python/felderize
9+
python3 -m venv .venv
10+
source .venv/bin/activate
11+
pip install -e .
12+
```
13+
14+
> **Note:** `pip install -e .` is required before running `felderize`. It registers the package and CLI command.
15+
16+
Create a `.env` file with your API key:
17+
18+
```bash
19+
echo 'ANTHROPIC_API_KEY=your-key-here' > .env
20+
```
21+
22+
## Usage
23+
24+
### Run a built-in example
25+
26+
```bash
27+
# List available examples
28+
felderize example
29+
30+
# Translate an example
31+
felderize example simple
32+
33+
# With compiler validation
34+
felderize example simple --validate
35+
36+
# JSON output
37+
felderize example simple --json-output
38+
```
39+
40+
### Translate your own SQL
41+
42+
```bash
43+
felderize translate path/to/schema.sql path/to/query.sql
44+
felderize translate path/to/schema.sql path/to/query.sql --validate
45+
```
46+
47+
### Batch translation
48+
49+
```bash
50+
felderize batch path/to/data_dir/ --output-dir results/
51+
```
52+
53+
Each subdirectory should contain `*_schema.sql` and `*_query.sql` files.
54+
55+
## Configuration
56+
57+
Environment variables (set in `.env`):
58+
59+
| Variable | Description | Default |
60+
|---|---|---|
61+
| `ANTHROPIC_API_KEY` | Anthropic API key | (required) |
62+
| `FELDERIZE_LLM_PROVIDER` | `anthropic` or `openai` | `anthropic` |
63+
| `FELDERIZE_MODEL` | LLM model to use | `claude-sonnet-4-20250514` |
64+
| `OPENAI_API_KEY` | OpenAI API key (if using openai provider) ||
65+
| `FELDERA_COMPILER` | Path to sql-to-dbsp compiler | `<repo-root>/sql-to-dbsp-compiler/SQL-compiler/sql-to-dbsp` |
66+
67+
## How it works
68+
69+
1. Loads translation rules from skill files (`spark/data/skills/`)
70+
2. Sends Spark SQL to the LLM with rules and validated examples
71+
3. Parses the translated Feldera SQL from the LLM response
72+
4. Optionally validates output against the Feldera compiler, retrying with error feedback if needed
73+
74+
## Support
75+
76+
Contact us at support@feldera.com for assistance with unsupported Spark SQL features.

python/felderize/pyproject.toml

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
[build-system]
2+
requires = ["setuptools>=68.0"]
3+
build-backend = "setuptools.build_meta"
4+
5+
[project]
6+
name = "felderize"
7+
version = "0.1.0"
8+
description = "Spark SQL to Feldera SQL translator agent"
9+
requires-python = ">=3.10"
10+
dependencies = [
11+
"anthropic>=0.39.0",
12+
"openai>=1.50.0",
13+
"httpx>=0.27.0",
14+
"click>=8.1.0",
15+
"pyyaml>=6.0",
16+
"python-dotenv>=1.0.0",
17+
]
18+
19+
[tool.setuptools.package-dir]
20+
felderize = "spark"
21+
22+
[tool.setuptools.package-data]
23+
felderize = [
24+
"data/skills/**/*.md",
25+
"data/samples/*.md",
26+
"data/demo/*.sql",
27+
"data/demo/expected/*.sql",
28+
"data/docs/sql/*.md",
29+
"data/compiler/sql-to-dbsp",
30+
"data/compiler/*.jar",
31+
]
32+
33+
[project.scripts]
34+
felderize = "felderize.cli:cli"

python/felderize/spark/__init__.py

Whitespace-only changes.

python/felderize/spark/cli.py

Lines changed: 209 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,209 @@
1+
from __future__ import annotations
2+
3+
import json
4+
import sys
5+
from pathlib import Path
6+
7+
import click
8+
9+
from felderize.config import Config
10+
from felderize.models import TranslationResult
11+
from felderize.translator import translate_spark_to_feldera
12+
13+
14+
@click.group()
15+
def cli():
16+
"""Spark SQL to Feldera SQL translator."""
17+
18+
19+
@cli.command()
20+
@click.argument("schema_file", type=click.Path(exists=True))
21+
@click.argument("query_file", type=click.Path(exists=True))
22+
@click.option("--validate", is_flag=True, help="Validate against Feldera instance")
23+
@click.option("--json-output", is_flag=True, help="Output as JSON")
24+
@click.option("--no-docs", is_flag=True, help="Disable Feldera doc inclusion in prompt")
25+
def translate(schema_file: str, query_file: str, validate: bool, json_output: bool, no_docs: bool):
26+
"""Translate a single Spark SQL schema + query pair to Feldera SQL."""
27+
config = Config.from_env()
28+
schema_sql = Path(schema_file).read_text()
29+
query_sql = Path(query_file).read_text()
30+
31+
result = translate_spark_to_feldera(
32+
schema_sql, query_sql, config, validate=validate, include_docs=not no_docs,
33+
)
34+
35+
if json_output:
36+
click.echo(json.dumps(result.to_dict(), indent=2))
37+
else:
38+
_print_result(result)
39+
40+
41+
@cli.command()
42+
@click.argument("data_dir", type=click.Path(exists=True))
43+
@click.option("--validate", is_flag=True, help="Validate against Feldera instance")
44+
@click.option("--output-dir", type=click.Path(), help="Write results to directory")
45+
@click.option("--no-docs", is_flag=True, help="Disable Feldera doc inclusion in prompt")
46+
def batch(data_dir: str, validate: bool, output_dir: str | None, no_docs: bool):
47+
"""Translate all Spark SQL pairs in a directory."""
48+
config = Config.from_env()
49+
data_path = Path(data_dir)
50+
results: dict[str, dict] = {}
51+
52+
# Find all benchmark directories
53+
dirs = sorted(d for d in data_path.iterdir() if d.is_dir())
54+
55+
if not dirs:
56+
click.echo("No benchmark directories found.", err=True)
57+
sys.exit(1)
58+
59+
for bm_dir in dirs:
60+
name = bm_dir.name
61+
schema_files = list(bm_dir.glob("*_schema.sql"))
62+
query_files = list(bm_dir.glob("*_query.sql"))
63+
64+
if not schema_files or not query_files:
65+
click.echo(f"Skipping {name}: missing schema or query file", err=True)
66+
continue
67+
68+
schema_sql = schema_files[0].read_text()
69+
query_sql = query_files[0].read_text()
70+
71+
click.echo(f"Translating {name}...", err=True)
72+
result = translate_spark_to_feldera(
73+
schema_sql, query_sql, config, validate=validate, include_docs=not no_docs,
74+
)
75+
results[name] = result.to_dict()
76+
77+
if output_dir:
78+
out_path = Path(output_dir)
79+
out_path.mkdir(parents=True, exist_ok=True)
80+
(out_path / f"{name}.sql").write_text(
81+
result.feldera_schema + "\n\n" + result.feldera_query
82+
)
83+
(out_path / f"{name}.json").write_text(
84+
json.dumps(result.to_dict(), indent=2)
85+
)
86+
87+
# Summary
88+
total = len(results)
89+
success = sum(1 for r in results.values() if r["status"] == "success")
90+
click.echo(f"\nResults: {success}/{total} successful", err=True)
91+
92+
# Print full results as JSON to stdout
93+
click.echo(json.dumps(results, indent=2))
94+
95+
96+
_EXAMPLES_DIR = Path(__file__).resolve().parent / "data" / "demo"
97+
98+
99+
@cli.command()
100+
@click.argument("name", required=False)
101+
@click.option("--validate/--no-validate", default=True, help="Validate against Feldera instance (default: on)")
102+
@click.option("--json-output", is_flag=True, help="Output as JSON")
103+
@click.option("--no-docs", is_flag=True, help="Disable Feldera doc inclusion in prompt")
104+
def example(name: str | None, validate: bool, json_output: bool, no_docs: bool):
105+
"""Run a built-in example translation.
106+
107+
Without NAME, lists available examples. With NAME, translates that example.
108+
109+
\b
110+
Usage:
111+
felderize example # list available examples
112+
felderize example simple # translate the 'simple' example
113+
"""
114+
# Discover available examples
115+
pairs: dict[str, tuple[Path, Path]] = {}
116+
for schema_file in sorted(_EXAMPLES_DIR.glob("*_schema.sql")):
117+
example_name = schema_file.name.replace("_schema.sql", "")
118+
query_file = _EXAMPLES_DIR / f"{example_name}_query.sql"
119+
if query_file.is_file():
120+
pairs[example_name] = (schema_file, query_file)
121+
122+
if not name:
123+
click.echo("Available examples:\n")
124+
for ex_name, (sf, qf) in pairs.items():
125+
schema_preview = sf.read_text().strip().split("\n")[0]
126+
click.echo(f" {ex_name:20s} {schema_preview}")
127+
click.echo(f"\nRun one with: felderize example <name>")
128+
return
129+
130+
if name not in pairs:
131+
click.echo(f"Unknown example '{name}'. Available: {', '.join(pairs)}", err=True)
132+
sys.exit(1)
133+
134+
schema_file, query_file = pairs[name]
135+
schema_sql = schema_file.read_text()
136+
query_sql = query_file.read_text()
137+
138+
click.echo(f"-- Spark Schema ({name}) --", err=True)
139+
click.echo(schema_sql.strip(), err=True)
140+
click.echo(f"\n-- Spark Query ({name}) --", err=True)
141+
click.echo(query_sql.strip(), err=True)
142+
click.echo("\nTranslating...\n", err=True)
143+
144+
config = Config.from_env()
145+
result = translate_spark_to_feldera(
146+
schema_sql, query_sql, config, validate=validate, include_docs=not no_docs,
147+
)
148+
149+
if json_output:
150+
click.echo(json.dumps(result.to_dict(), indent=2))
151+
else:
152+
_print_result(result)
153+
154+
155+
_CONTACT_MESSAGE = (
156+
"\n Some Spark SQL features are not yet supported in Feldera.\n"
157+
" Contact us at support@feldera.com to request support for these features."
158+
)
159+
160+
_ERROR_CONTACT_MESSAGE = "\n Contact us at support@feldera.com for assistance."
161+
162+
163+
def _print_result(result: TranslationResult):
164+
"""Pretty-print a translation result."""
165+
from felderize.models import Status
166+
167+
if result.status == Status.ERROR:
168+
click.echo("-- Translation Failed --", err=True)
169+
click.echo(
170+
" The translation could not be validated against the Feldera compiler.",
171+
err=True,
172+
)
173+
if result.warnings:
174+
click.echo(" Errors:", err=True)
175+
for item in result.warnings:
176+
click.echo(f" - {item}", err=True)
177+
click.echo(_ERROR_CONTACT_MESSAGE, err=True)
178+
click.echo(f"\nStatus: {result.status.value}", err=True)
179+
return
180+
181+
if result.feldera_schema:
182+
click.echo("-- Schema --")
183+
click.echo(result.feldera_schema)
184+
click.echo()
185+
186+
if result.feldera_query:
187+
click.echo("-- Query --")
188+
click.echo(result.feldera_query)
189+
click.echo()
190+
191+
if result.explanations:
192+
click.echo("-- Transformations Applied --", err=True)
193+
for item in result.explanations:
194+
click.echo(f" - {item}", err=True)
195+
click.echo(err=True)
196+
197+
if result.unsupported:
198+
click.echo("-- Unsupported --", err=True)
199+
for item in result.unsupported:
200+
click.echo(f" - {item}", err=True)
201+
click.echo(_CONTACT_MESSAGE, err=True)
202+
click.echo(err=True)
203+
204+
if result.warnings:
205+
click.echo("-- Warnings --", err=True)
206+
for item in result.warnings:
207+
click.echo(f" - {item}", err=True)
208+
209+
click.echo(f"\nStatus: {result.status.value}", err=True)

python/felderize/spark/config.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
from __future__ import annotations
2+
3+
import os
4+
from dataclasses import dataclass
5+
from pathlib import Path
6+
7+
from dotenv import load_dotenv
8+
9+
10+
@dataclass
11+
class Config:
12+
llm_provider: str = "anthropic"
13+
model: str = ""
14+
api_key: str = ""
15+
feldera_compiler: str = ""
16+
17+
@classmethod
18+
def from_env(cls) -> Config:
19+
# Load .env from project root (won't override existing env vars)
20+
env_path = Path(__file__).resolve().parent.parent / ".env"
21+
load_dotenv(env_path)
22+
23+
provider = os.environ.get("FELDERIZE_LLM_PROVIDER", "anthropic")
24+
25+
if provider == "openai":
26+
default_model = "gpt-4o"
27+
api_key = os.environ.get("OPENAI_API_KEY", "")
28+
else:
29+
default_model = "claude-sonnet-4-20250514"
30+
api_key = os.environ.get("ANTHROPIC_API_KEY", "")
31+
32+
return cls(
33+
llm_provider=provider,
34+
model=os.environ.get("FELDERIZE_MODEL", default_model),
35+
api_key=api_key,
36+
feldera_compiler=os.environ.get("FELDERA_COMPILER", ""),
37+
)
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
CREATE OR REPLACE TEMP VIEW user_engagement AS
2+
SELECT
3+
user_id,
4+
COUNT(DISTINCT page_url) AS unique_pages,
5+
COLLECT_LIST(page_url) AS visited_pages,
6+
AVG(view_duration) AS avg_duration,
7+
PERCENTILE_APPROX(view_duration, 0.95) AS p95_duration,
8+
MIN(view_time) AS first_seen,
9+
MAX(view_time) AS last_seen,
10+
COUNT(CASE WHEN device_type = 'mobile' THEN 1 END) AS mobile_views
11+
FROM page_views
12+
GROUP BY user_id
13+
HAVING COUNT(*) > 5;
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
CREATE TABLE page_views (
2+
view_id BIGINT,
3+
user_id BIGINT,
4+
page_url STRING,
5+
referrer STRING,
6+
device_type STRING,
7+
view_duration INT,
8+
view_time TIMESTAMP
9+
) USING parquet;
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
CREATE OR REPLACE TEMP VIEW session_tag_summary AS
2+
SELECT
3+
user_id,
4+
size(tags) AS tag_count,
5+
array_contains(tags, 'vip') AS has_vip_tag,
6+
element_at(attributes, 'source') AS traffic_source
7+
FROM session_profiles;
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
CREATE TABLE session_profiles (
2+
session_id BIGINT,
3+
user_id BIGINT,
4+
tags ARRAY<STRING>,
5+
attributes MAP<STRING, STRING>,
6+
event_time TIMESTAMP
7+
) USING parquet;

0 commit comments

Comments
 (0)