Skip to content
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
046f795
[migrations] Spark-to-Feldera migration tool PoC.
wilmaontherun Mar 16, 2026
bb1b24c
[ci] apply automatic fixes
feldera-bot Mar 16, 2026
871d79c
Intermediate progress based on Mihai's comments.
wilmaontherun Mar 19, 2026
e856080
fixed comments on skills
wilmaontherun Mar 19, 2026
b672679
Fixed all comments before we refactor skills
wilmaontherun Mar 19, 2026
3d8536f
Merge skills
wilmaontherun Mar 19, 2026
219f8b7
Fixed the rest of the code w.r.t. new skill file
wilmaontherun Mar 19, 2026
83d7731
Revised doc indexing
wilmaontherun Mar 19, 2026
98f7dee
merged skills
wilmaontherun Mar 19, 2026
56bd911
add --verbose flag, translate-file, combined demos, and Feldera PK/qu…
wilmaontherun Mar 19, 2026
b10f7a4
more demo files
wilmaontherun Mar 19, 2026
7d56201
revised samples and skills
wilmaontherun Mar 20, 2026
9e17a33
[ci] apply automatic fixes
feldera-bot Mar 20, 2026
3a75739
add --compiler option, fix no-compiler handling, improve example list…
wilmaontherun Mar 20, 2026
ab4f746
[ci] apply automatic fixes
feldera-bot Mar 20, 2026
6672146
fixed readme
wilmaontherun Mar 20, 2026
9975f33
[ci] apply automatic fixes
feldera-bot Mar 20, 2026
14e7cc6
Add --model option, remove OpenAI support and hardcoded compiler path
wilmaontherun Mar 20, 2026
7ac2fb9
Use sqlparse for SQL splitting, fix README inconsistencies
wilmaontherun Mar 20, 2026
6655f7c
Add prompt caching and rate limit retry; skip examples on first pass
wilmaontherun Mar 20, 2026
8fa1210
Clean up code quality: fix imports, types, and consistency issues
wilmaontherun Mar 21, 2026
4358104
Fix spark_skills.md inconsistencies
wilmaontherun Mar 24, 2026
80a5e7f
[ci] apply automatic fixes
feldera-bot Mar 24, 2026
7da4c30
Verify and fix spark_skills.md against Apache Spark SQL reference
wilmaontherun Mar 24, 2026
589406c
Overhaul spark/data/samples: fix errors, add new patterns, remove tri…
wilmaontherun Mar 24, 2026
b53a97a
Fix skills inconsistencies: QUARTER unsupported, contains/binary, pmo…
wilmaontherun Mar 25, 2026
05388f9
Rename misnamed sample files to match their content
wilmaontherun Mar 25, 2026
643ba2b
Improve and expand sample demos
wilmaontherun Mar 25, 2026
d1b0c95
Fix demo files: remove unsupported patterns, add dates and arithmetic…
wilmaontherun Mar 25, 2026
d7057b9
Fix aggregations and arithmetic demos to use only supported Feldera f…
wilmaontherun Mar 25, 2026
b438037
split_part skill & base_url config
anandbraman Mar 27, 2026
aaf02dd
Update AVG(integer) rule: rewrite to AVG(CAST(col AS DOUBLE)) for int…
wilmaontherun Mar 30, 2026
9de37f9
Untrack known_unsupported.yaml (ignored by .gitignore)
wilmaontherun Mar 30, 2026
e088426
Add dialect subcommand structure: felderize spark <cmd>
wilmaontherun Mar 30, 2026
d596584
Move data/skills one level above spark/ to felderize root
wilmaontherun Mar 30, 2026
d5bf6e0
Move skills file to spark/skills/
wilmaontherun Mar 30, 2026
e714f19
Address review comments: lateral aliases, size/CARDINALITY, JSON, sem…
wilmaontherun Mar 30, 2026
621dd95
Merge branch 'gh-readonly-queue/main/pr-5953-63f28bb9543f28137c91d7a2…
wilmaontherun Mar 30, 2026
a84c82f
Reorganize spark_skills.md: fix section placement and add missing ent…
wilmaontherun Mar 31, 2026
06b7bc6
Fix and improve spark_skills.md translation rules
wilmaontherun Apr 2, 2026
425d5ab
Reorganize spark_skills.md: structured subsections with emoji markers
wilmaontherun Apr 2, 2026
2b98d4e
Verify spark_skills.md claims against test evidence; fix errors found
wilmaontherun Apr 2, 2026
4faca22
Fix skills: STDDEV rewrite rule, GBD-REGEX-ESCAPE for RLIKE, CAST(num…
wilmaontherun Apr 2, 2026
c1f740d
README: note that Spark is current dialect, more planned
wilmaontherun Apr 2, 2026
d8a1ed6
[ci] apply automatic fixes
feldera-bot Apr 2, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
[migrations] Spark-to-Feldera migration tool PoC.
CLI tool using LLM to translate and syntactically validate Spark SQL programs to Feldera SQL.

Signed-off-by: Wilma <wilmaontherun@gmail.com>
  • Loading branch information
wilmaontherun committed Mar 16, 2026
commit 046f7952126a3084d20937df0f8d1f65a69bb625
76 changes: 76 additions & 0 deletions python/felderize/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# Felderize — Spark SQL to Feldera SQL Translator
Comment thread
wilmaontherun marked this conversation as resolved.
Outdated

felderize translates Spark SQL schemas and queries into valid [Feldera](https://www.feldera.com/) SQL using LLM-based translation with optional compiler validation.
Comment thread
wilmaontherun marked this conversation as resolved.
Outdated

## Setup

```bash
cd python/felderize
python3 -m venv .venv
source .venv/bin/activate
pip install -e .
```

> **Note:** `pip install -e .` is required before running `felderize`. It registers the package and CLI command.

Create a `.env` file with your API key:

```bash
echo 'ANTHROPIC_API_KEY=your-key-here' > .env
```

## Usage

### Run a built-in example

```bash
# List available examples
felderize example

# Translate an example
felderize example simple

# With compiler validation
felderize example simple --validate

# JSON output
Comment thread
wilmaontherun marked this conversation as resolved.
Outdated
felderize example simple --json-output
```

### Translate your own SQL

```bash
felderize translate path/to/schema.sql path/to/query.sql
felderize translate path/to/schema.sql path/to/query.sql --validate
Comment thread
wilmaontherun marked this conversation as resolved.
Outdated
```

### Batch translation

```bash
felderize batch path/to/data_dir/ --output-dir results/
```

Each subdirectory should contain `*_schema.sql` and `*_query.sql` files.

## Configuration

Environment variables (set in `.env`):

| Variable | Description | Default |
|---|---|---|
| `ANTHROPIC_API_KEY` | Anthropic API key | (required) |
| `FELDERIZE_LLM_PROVIDER` | `anthropic` or `openai` | `anthropic` |
| `FELDERIZE_MODEL` | LLM model to use | `claude-sonnet-4-20250514` |
| `OPENAI_API_KEY` | OpenAI API key (if using openai provider) | — |
| `FELDERA_COMPILER` | Path to sql-to-dbsp compiler | `<repo-root>/sql-to-dbsp-compiler/SQL-compiler/sql-to-dbsp` |

## How it works

1. Loads translation rules from skill files (`spark/data/skills/`)
2. Sends Spark SQL to the LLM with rules and validated examples
3. Parses the translated Feldera SQL from the LLM response
4. Optionally validates output against the Feldera compiler, retrying with error feedback if needed

## Support

Contact us at support@feldera.com for assistance with unsupported Spark SQL features.
34 changes: 34 additions & 0 deletions python/felderize/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
[build-system]
requires = ["setuptools>=68.0"]
build-backend = "setuptools.build_meta"

[project]
name = "felderize"
version = "0.1.0"
description = "Spark SQL to Feldera SQL translator agent"
requires-python = ">=3.10"
dependencies = [
"anthropic>=0.39.0",
"openai>=1.50.0",
"httpx>=0.27.0",
"click>=8.1.0",
"pyyaml>=6.0",
"python-dotenv>=1.0.0",
]

[tool.setuptools.package-dir]
felderize = "spark"

[tool.setuptools.package-data]
felderize = [
"data/skills/**/*.md",
"data/samples/*.md",
"data/demo/*.sql",
"data/demo/expected/*.sql",
"data/docs/sql/*.md",
"data/compiler/sql-to-dbsp",
"data/compiler/*.jar",
]

[project.scripts]
felderize = "felderize.cli:cli"
Empty file.
209 changes: 209 additions & 0 deletions python/felderize/spark/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
from __future__ import annotations

import json
import sys
from pathlib import Path

import click

from felderize.config import Config
from felderize.models import TranslationResult
from felderize.translator import translate_spark_to_feldera


@click.group()
def cli():
"""Spark SQL to Feldera SQL translator."""


@cli.command()
@click.argument("schema_file", type=click.Path(exists=True))
@click.argument("query_file", type=click.Path(exists=True))
@click.option("--validate", is_flag=True, help="Validate against Feldera instance")
@click.option("--json-output", is_flag=True, help="Output as JSON")
@click.option("--no-docs", is_flag=True, help="Disable Feldera doc inclusion in prompt")
def translate(schema_file: str, query_file: str, validate: bool, json_output: bool, no_docs: bool):
"""Translate a single Spark SQL schema + query pair to Feldera SQL."""
config = Config.from_env()
schema_sql = Path(schema_file).read_text()
query_sql = Path(query_file).read_text()

result = translate_spark_to_feldera(
schema_sql, query_sql, config, validate=validate, include_docs=not no_docs,
)

if json_output:
click.echo(json.dumps(result.to_dict(), indent=2))
else:
_print_result(result)


@cli.command()
@click.argument("data_dir", type=click.Path(exists=True))
@click.option("--validate", is_flag=True, help="Validate against Feldera instance")
@click.option("--output-dir", type=click.Path(), help="Write results to directory")
@click.option("--no-docs", is_flag=True, help="Disable Feldera doc inclusion in prompt")
def batch(data_dir: str, validate: bool, output_dir: str | None, no_docs: bool):
"""Translate all Spark SQL pairs in a directory."""
config = Config.from_env()
data_path = Path(data_dir)
results: dict[str, dict] = {}

# Find all benchmark directories
dirs = sorted(d for d in data_path.iterdir() if d.is_dir())

if not dirs:
click.echo("No benchmark directories found.", err=True)
sys.exit(1)

for bm_dir in dirs:
name = bm_dir.name
schema_files = list(bm_dir.glob("*_schema.sql"))
query_files = list(bm_dir.glob("*_query.sql"))

if not schema_files or not query_files:
click.echo(f"Skipping {name}: missing schema or query file", err=True)
continue

schema_sql = schema_files[0].read_text()
query_sql = query_files[0].read_text()

click.echo(f"Translating {name}...", err=True)
result = translate_spark_to_feldera(
schema_sql, query_sql, config, validate=validate, include_docs=not no_docs,
)
results[name] = result.to_dict()

if output_dir:
out_path = Path(output_dir)
out_path.mkdir(parents=True, exist_ok=True)
(out_path / f"{name}.sql").write_text(
result.feldera_schema + "\n\n" + result.feldera_query
)
(out_path / f"{name}.json").write_text(
json.dumps(result.to_dict(), indent=2)
)

# Summary
total = len(results)
success = sum(1 for r in results.values() if r["status"] == "success")
click.echo(f"\nResults: {success}/{total} successful", err=True)

# Print full results as JSON to stdout
click.echo(json.dumps(results, indent=2))


_EXAMPLES_DIR = Path(__file__).resolve().parent / "data" / "demo"


@cli.command()
@click.argument("name", required=False)
@click.option("--validate/--no-validate", default=True, help="Validate against Feldera instance (default: on)")
@click.option("--json-output", is_flag=True, help="Output as JSON")
@click.option("--no-docs", is_flag=True, help="Disable Feldera doc inclusion in prompt")
def example(name: str | None, validate: bool, json_output: bool, no_docs: bool):
"""Run a built-in example translation.

Without NAME, lists available examples. With NAME, translates that example.

\b
Usage:
felderize example # list available examples
felderize example simple # translate the 'simple' example
"""
# Discover available examples
pairs: dict[str, tuple[Path, Path]] = {}
for schema_file in sorted(_EXAMPLES_DIR.glob("*_schema.sql")):
example_name = schema_file.name.replace("_schema.sql", "")
query_file = _EXAMPLES_DIR / f"{example_name}_query.sql"
if query_file.is_file():
pairs[example_name] = (schema_file, query_file)

if not name:
click.echo("Available examples:\n")
for ex_name, (sf, qf) in pairs.items():
schema_preview = sf.read_text().strip().split("\n")[0]
click.echo(f" {ex_name:20s} {schema_preview}")
click.echo(f"\nRun one with: felderize example <name>")
return

if name not in pairs:
click.echo(f"Unknown example '{name}'. Available: {', '.join(pairs)}", err=True)
sys.exit(1)

schema_file, query_file = pairs[name]
schema_sql = schema_file.read_text()
query_sql = query_file.read_text()

click.echo(f"-- Spark Schema ({name}) --", err=True)
click.echo(schema_sql.strip(), err=True)
click.echo(f"\n-- Spark Query ({name}) --", err=True)
click.echo(query_sql.strip(), err=True)
click.echo("\nTranslating...\n", err=True)

config = Config.from_env()
result = translate_spark_to_feldera(
schema_sql, query_sql, config, validate=validate, include_docs=not no_docs,
)

if json_output:
click.echo(json.dumps(result.to_dict(), indent=2))
else:
_print_result(result)


_CONTACT_MESSAGE = (
"\n Some Spark SQL features are not yet supported in Feldera.\n"
" Contact us at support@feldera.com to request support for these features."
Comment thread
wilmaontherun marked this conversation as resolved.
Outdated
)

_ERROR_CONTACT_MESSAGE = "\n Contact us at support@feldera.com for assistance."


def _print_result(result: TranslationResult):
"""Pretty-print a translation result."""
from felderize.models import Status

if result.status == Status.ERROR:
click.echo("-- Translation Failed --", err=True)
click.echo(
" The translation could not be validated against the Feldera compiler.",
err=True,
)
if result.warnings:
click.echo(" Errors:", err=True)
for item in result.warnings:
click.echo(f" - {item}", err=True)
click.echo(_ERROR_CONTACT_MESSAGE, err=True)
click.echo(f"\nStatus: {result.status.value}", err=True)
return

if result.feldera_schema:
click.echo("-- Schema --")
click.echo(result.feldera_schema)
click.echo()

if result.feldera_query:
click.echo("-- Query --")
click.echo(result.feldera_query)
click.echo()

if result.explanations:
click.echo("-- Transformations Applied --", err=True)
for item in result.explanations:
click.echo(f" - {item}", err=True)
click.echo(err=True)

if result.unsupported:
click.echo("-- Unsupported --", err=True)
for item in result.unsupported:
click.echo(f" - {item}", err=True)
click.echo(_CONTACT_MESSAGE, err=True)
click.echo(err=True)

if result.warnings:
click.echo("-- Warnings --", err=True)
for item in result.warnings:
click.echo(f" - {item}", err=True)

click.echo(f"\nStatus: {result.status.value}", err=True)
37 changes: 37 additions & 0 deletions python/felderize/spark/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
from __future__ import annotations

import os
from dataclasses import dataclass
from pathlib import Path

from dotenv import load_dotenv


@dataclass
class Config:
llm_provider: str = "anthropic"
model: str = ""
api_key: str = ""
feldera_compiler: str = ""

@classmethod
def from_env(cls) -> Config:
# Load .env from project root (won't override existing env vars)
env_path = Path(__file__).resolve().parent.parent / ".env"
load_dotenv(env_path)

provider = os.environ.get("FELDERIZE_LLM_PROVIDER", "anthropic")

if provider == "openai":
default_model = "gpt-4o"
api_key = os.environ.get("OPENAI_API_KEY", "")
else:
default_model = "claude-sonnet-4-20250514"
api_key = os.environ.get("ANTHROPIC_API_KEY", "")

return cls(
llm_provider=provider,
model=os.environ.get("FELDERIZE_MODEL", default_model),
api_key=api_key,
feldera_compiler=os.environ.get("FELDERA_COMPILER", ""),
)
13 changes: 13 additions & 0 deletions python/felderize/spark/data/demo/aggregations_query.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
CREATE OR REPLACE TEMP VIEW user_engagement AS
SELECT
user_id,
COUNT(DISTINCT page_url) AS unique_pages,
COLLECT_LIST(page_url) AS visited_pages,
AVG(view_duration) AS avg_duration,
PERCENTILE_APPROX(view_duration, 0.95) AS p95_duration,
MIN(view_time) AS first_seen,
MAX(view_time) AS last_seen,
COUNT(CASE WHEN device_type = 'mobile' THEN 1 END) AS mobile_views
FROM page_views
GROUP BY user_id
HAVING COUNT(*) > 5;
9 changes: 9 additions & 0 deletions python/felderize/spark/data/demo/aggregations_schema.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
CREATE TABLE page_views (
view_id BIGINT,
user_id BIGINT,
page_url STRING,
referrer STRING,
device_type STRING,
view_duration INT,
view_time TIMESTAMP
) USING parquet;
7 changes: 7 additions & 0 deletions python/felderize/spark/data/demo/arrays_query.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
CREATE OR REPLACE TEMP VIEW session_tag_summary AS
SELECT
user_id,
size(tags) AS tag_count,
array_contains(tags, 'vip') AS has_vip_tag,
element_at(attributes, 'source') AS traffic_source
FROM session_profiles;
7 changes: 7 additions & 0 deletions python/felderize/spark/data/demo/arrays_schema.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
CREATE TABLE session_profiles (
session_id BIGINT,
user_id BIGINT,
tags ARRAY<STRING>,
attributes MAP<STRING, STRING>,
event_time TIMESTAMP
) USING parquet;
Loading
Loading