from __future__ import annotations
import os
import re
import sys
import urllib.request
from pathlib import Path
import yaml
from felderize.constants import DEFAULT_DOCS_BASE_URL, HTTP_TIMEOUT
# Map each category to its doc file.
_DOC_FILES: dict[str, str] = {
"types": "types.md",
"string": "string.md",
"datetime": "datetime.md",
"json": "json.md",
"aggregates": "aggregates.md",
"array": "array.md",
"map": "map.md",
"decimal": "decimal.md",
"float": "float.md",
"casts": "casts.md",
"comparisons": "comparisons.md",
"boolean": "boolean.md",
"integer": "integer.md",
"binary": "binary.md",
"operators": "operators.md",
"streaming": "streaming.md",
"unsupported": "unsupported-operations.md",
"uuid": "uuid.md",
}
# SQL construct patterns that cannot be derived from the function index
# (keywords, operators, syntax forms rather than named functions).
# Keep these specific — broad patterns like \bDATE\b match almost every query.
_EXTRA_PATTERNS: dict[str, list[str]] = {
"datetime": [r"\bINTERVAL\b"], # DATE/TIMESTAMP covered by index function names
"aggregates": [
r"\bGROUP\s+BY\b",
r"\bHAVING\b",
r"\bOVER\s*\(",
r"\bFIRST_VALUE\b",
r"\bLAST_VALUE\b", # in doc but not function index
r"\bVARIANCE\b",
r"\bVAR_POP\b",
r"\bVAR_SAMP\b",
r"\bPERCENT_RANK\b",
r"\bCUME_DIST\b",
r"\bNTILE\b",
],
"array": [r"\bEXPLODE\b", r"\bUNNEST\b", r"\bsize\s*\("],
"map": [r"\bMAP\s*<"], # MAP( covered by index; MAP< is type syntax
"json": [r"\bVARIANT\b"], # JSON covered by index function names
"casts": [r"::"], # CAST covered by index; :: is operator syntax
"comparisons": [r"\bCASE\s+WHEN\b"],
"boolean": [r"\bIS\s+(NOT\s+)?(?:TRUE|FALSE)\b", r"\bBOOLEAN\b", r"\bBOOL\b"],
"decimal": [r"\bDECIMAL\b"],
"integer": [
r"\bTINYINT\b",
r"\bSMALLINT\b",
r"\bBIGINT\b",
r"\bDIV_NULL\b",
r"\bSEQUENCE\b",
],
"binary": [r"\bVARBINARY\b", r"\bBINARY\b", r"\bXXHASH\b"],
"float": [r"\bFINITE_OR_NULL\b"],
"operators": [r"\bBETWEEN\b", r"<=>", r"\bCONTAINS\b", r"\bOVERLAPS\b"],
"streaming": [r"\bLATENESS\b", r"\bWATERMARK\b", r"\bTUMBLE\b", r"\bHOP\b"],
"uuid": [r"\bUUID\b"],
}
# Spark function names that appear in SQL but are not in the Feldera index.
_SPARK_ALIASES: dict[str, list[str]] = {
"json": [r"\bget_json_object\b", r"\bfrom_json\b", r"\bjson_tuple\b"],
"array": [
r"\barray_contains\b",
r"\bsort_array\b",
r"\barray_distinct\b",
r"\belement_at\b",
],
"map": [r"\belement_at\b", r"\bstr_to_map\b", r"\bmap_from_arrays\b"],
"datetime": [
r"\bto_date\b",
r"\bto_timestamp\b",
r"\bunix_timestamp\b",
r"\bfrom_unixtime\b",
r"\bdate_format\b",
r"\badd_months\b",
r"\bmonths_between\b",
r"\blast_day\b",
r"\bdayofweek\b",
r"\bdayofmonth\b",
r"\bdayofyear\b",
r"\bweekofyear\b",
],
"string": [
r"\bLPAD\b",
r"\bRPAD\b",
r"\bLTRIM\b",
r"\bRTRIM\b",
r"\bBTRIM\b",
r"\bTRANSLATE\b",
r"\bINSTR\b",
r"\bLOCATE\b",
r"\bSPACE\b",
],
"decimal": [r"\bNUMERIC\b"],
"float": [r"\bFLOAT\b", r"\bPOW\b", r"\bLOG2\b", r"\bHYPOT\b"],
"comparisons": [r"\bNVL\b", r"\bNVL2\b", r"\bDECODE\b"],
}
# Regex to find HTML anchor IDs embedded in doc files:
_ANCHOR_ID_RE = re.compile(r' str:
"""Fetch a SQL doc file from the Feldera docs repo.
The base URL is read from FELDERA_DOCS_BASE_URL (default: main branch on GitHub).
"""
base = os.environ.get("FELDERA_DOCS_BASE_URL", DEFAULT_DOCS_BASE_URL)
url = f"{base}/{filename}"
try:
with urllib.request.urlopen(url, timeout=HTTP_TIMEOUT) as resp:
return resp.read().decode("utf-8")
except Exception as e:
print(f"docs.py: failed to fetch {url}: {e}", file=sys.stderr)
return ""
def _build_categories_from_index() -> tuple[
dict[str, list[str]], dict[str, list[tuple[str, str]]]
]:
"""Fetch and parse function-index.md from the Feldera docs repo.
Returns:
categories: category → [\\bFUNC\\b, ...] trigger patterns
func_anchors: FUNC_NAME_UPPER → [(doc_filename, anchor_id), ...]
"""
known = set(_DOC_FILES) - _ALWAYS_INCLUDED
categories: dict[str, list[str]] = {cat: [] for cat in _DOC_FILES}
func_anchors: dict[str, list[tuple[str, str]]] = {}
text = _fetch_doc("function-index.md")
if not text:
return categories, func_anchors
for line in text.splitlines():
m = _INDEX_FUNC_RE.match(line)
if not m:
continue
func_name = m.group(1).strip()
func_upper = func_name.upper()
for link_m in _INDEX_LINK_RE.finditer(line):
cat = link_m.group(1)
doc_file = link_m.group(2) # e.g. "string.md"
anchor = link_m.group(3) # e.g. "upper" (may be None)
if cat in known:
keyword = rf"\b{re.escape(func_name)}\b"
if keyword not in categories[cat]:
categories[cat].append(keyword)
if anchor:
func_anchors.setdefault(func_upper, []).append((doc_file, anchor))
return categories, func_anchors
# Lazy cache — populated on first call to _get_categories_and_anchors().
_categories_cache: (
tuple[dict[str, list[str]], dict[str, list[tuple[str, str]]]] | None
) = None
def _get_categories_and_anchors() -> tuple[
dict[str, list[str]], dict[str, list[tuple[str, str]]]
]:
"""Return (categories, func_anchors), fetching function-index.md on first call."""
global _categories_cache
if _categories_cache is None:
categories, func_anchors = _build_categories_from_index()
for source in (_EXTRA_PATTERNS, _SPARK_ALIASES):
for cat, patterns in source.items():
seen = set(categories.get(cat, []))
for p in patterns:
if p not in seen:
categories.setdefault(cat, []).append(p)
seen.add(p)
_categories_cache = (categories, func_anchors)
return _categories_cache
# Module-level categories for load_examples() — populated lazily on first use.
def _get_categories() -> dict[str, list[str]]:
return _get_categories_and_anchors()[0]
# ── Section-level doc parsing ────────────────────────────────────────────────
# Cache: doc filename → (preamble, {heading: content}, {anchor_id: heading})
_section_cache: dict[str, tuple[str, dict[str, str], dict[str, str]]] = {}
def _parse_doc_sections(
content: str,
) -> tuple[str, dict[str, str], dict[str, str]]:
"""Split a doc file into (preamble, sections, anchor_map).
preamble — text before the first ## heading
sections — ordered dict: ## heading text → section content (includes heading line)
anchor_map — → ## heading text for every anchor in the file
"""
sections: dict[str, str] = {}
anchor_map: dict[str, str] = {}
preamble_lines: list[str] = []
current_heading: str | None = None
current_lines: list[str] = []
for line in content.splitlines(keepends=True):
if line.startswith("## "):
if current_heading is not None:
body = "".join(current_lines)
sections[current_heading] = body
for am in _ANCHOR_ID_RE.finditer(body):
anchor_map[am.group(1)] = current_heading
else:
preamble_lines = current_lines[:]
current_heading = line.rstrip()
current_lines = [line]
else:
current_lines.append(line)
if current_heading is not None:
body = "".join(current_lines)
sections[current_heading] = body
for am in _ANCHOR_ID_RE.finditer(body):
anchor_map[am.group(1)] = current_heading
elif current_lines:
preamble_lines = current_lines
return "".join(preamble_lines), sections, anchor_map
def _get_doc_sections(
filename: str,
) -> tuple[str, dict[str, str], dict[str, str]]:
"""Return parsed sections for a doc file (fetched once, then cached)."""
if filename not in _section_cache:
text = _fetch_doc(filename)
_section_cache[filename] = _parse_doc_sections(text) if text else ("", {}, {})
return _section_cache[filename]
def _load_relevant_sections(filename: str, relevant_anchors: set[str]) -> str:
"""Return preamble + only the ## sections that contain a relevant anchor.
Falls back to the full file content when no anchor information is available
(e.g., the file has no tags) so that we never return empty docs for
a matched category.
"""
preamble, sections, anchor_map = _get_doc_sections(filename)
if not sections:
# Plain file with no ## headings — return as-is.
return preamble
needed: set[str] = set()
for anchor in relevant_anchors:
if anchor in anchor_map:
needed.add(anchor_map[anchor])
if not needed:
# No specific functions detected or none matched → include everything.
parts = [preamble] if preamble.strip() else []
parts.append("".join(sections.values()))
return "".join(parts)
parts = [preamble] if preamble.strip() else []
for heading, body in sections.items():
if heading in needed:
parts.append(body)
return "".join(parts)
# ── Category detection ───────────────────────────────────────────────────────
def _detect_categories(
sql: str, categories: dict[str, list[str]] | None = None
) -> set[str]:
"""Return set of category names whose trigger patterns match the SQL."""
matched = _ALWAYS_INCLUDED.copy()
all_categories = categories if categories is not None else _get_categories()
for category, patterns in all_categories.items():
if not patterns:
continue
for pattern in patterns:
if re.search(pattern, sql, re.IGNORECASE):
matched.add(category)
break
return matched
def _detect_sql_functions(sql: str) -> set[str]:
"""Return uppercase names of all function calls found in the SQL."""
return {m.group(1).upper() for m in _SQL_FUNC_RE.finditer(sql)}
# ── Public API ───────────────────────────────────────────────────────────────
def load_docs(sql: str) -> str:
"""Load relevant Feldera doc sections based on SQL content.
Fetches docs from the Feldera GitHub repo on first call (cached per session).
Only sections whose anchors correspond to functions present in the SQL
are included. Falls back to full file content when no specific anchors match.
"""
categories, func_anchors = _get_categories_and_anchors()
detected = _detect_categories(sql, categories)
sql_funcs = _detect_sql_functions(sql)
result_sections: list[str] = []
for category in sorted(detected):
if category not in _DOC_FILES:
continue
doc_filename = _DOC_FILES[category]
relevant_anchors: set[str] = set()
for func in sql_funcs:
for fname, anchor in func_anchors.get(func, []):
if fname == doc_filename:
relevant_anchors.add(anchor)
content = _load_relevant_sections(doc_filename, relevant_anchors)
if content.strip():
result_sections.append(f"### {category}\n\n{content}")
return "\n\n---\n\n".join(result_sections)
_example_cache: dict[Path, tuple[set[str], str] | None] = {}
def _load_example_file(filepath: Path, detected: set[str]) -> str | None:
"""Return the body of one example file if it matches detected categories, else None.
Returns None for files that are not valid examples (no frontmatter, etc.).
"""
if filepath not in _example_cache:
raw = filepath.read_text()
if not raw.startswith("---"):
_example_cache[filepath] = None
return None
parts = raw.split("---", 2)
if len(parts) < 3:
_example_cache[filepath] = None
return None
file_categories: set[str] = set()
try:
meta = yaml.safe_load(parts[1])
if isinstance(meta, dict):
file_categories = set(meta.get("categories", []))
except yaml.YAMLError:
pass
_example_cache[filepath] = (file_categories, parts[2].strip())
cached = _example_cache[filepath]
if cached is None:
return None
file_categories, body = cached
return body if (not file_categories or file_categories & detected) else None
def _load_examples_from_dir(examples_dir: Path, detected: set[str]) -> list[str]:
"""Load matching example sections from a directory."""
if not examples_dir.is_dir():
return []
sections: list[str] = []
for filepath in sorted(examples_dir.glob("*.md")):
body = _load_example_file(filepath, detected)
if body is not None:
sections.append(body)
return sections
def load_examples(
sql: str,
extra_examples_dirs: list[Path] | None = None,
extra_examples_files: list[Path] | None = None,
) -> str:
"""Return validated translation examples relevant to the SQL input.
Loads from user-provided directories and/or individual files.
Directories are auto-discovered from ~/.felderize/examples/ and .felderize/examples/;
individual files and extra directories are passed explicitly.
"""
detected = _detect_categories(sql)
sections: list[str] = []
for d in extra_examples_dirs or []:
sections += _load_examples_from_dir(d, detected)
for f in extra_examples_files or []:
body = _load_example_file(f, detected)
if body is not None:
sections.append(body)
return "\n\n---\n\n".join(sections)