diff --git a/examples/capture_sdk_m2m_token.py b/examples/capture_sdk_m2m_token.py new file mode 100644 index 000000000..cf5e16d4b --- /dev/null +++ b/examples/capture_sdk_m2m_token.py @@ -0,0 +1,74 @@ +"""Capture the SDK-issued M2M JWT on Staging SPOG vs Staging Legacy, decode claims, print side by side. + +Isolates the SDK credential flow — no pysql connector involvement — so the output +shows exactly what the server issues in response to the SDK's M2M request. +""" +import base64 +import json +import os +import sys +import urllib.request + +from databricks.sdk.core import Config, oauth_service_principal +import databricks.sdk as _sdk + +print(f"databricks-sdk version: {_sdk.version.__version__}\n") + +CLIENT_ID = os.getenv("DATABRICKS_DOGFOOD_AZURE_CLIENT_ID") +CLIENT_SECRET = os.getenv("DATABRICKS_DOGFOOD_AZURE_CLIENT_SECRET") +if not (CLIENT_ID and CLIENT_SECRET): + print("Missing DATABRICKS_DOGFOOD_AZURE_CLIENT_ID/SECRET") + sys.exit(1) + + +def decode_jwt(token: str): + """Decode a JWT's header + payload (no signature verification) and return dicts.""" + parts = token.split(".") + if len(parts) != 3: + raise ValueError("not a JWT") + + def b64url_decode(s): + s += "=" * (-len(s) % 4) + return json.loads(base64.urlsafe_b64decode(s)) + + return b64url_decode(parts[0]), b64url_decode(parts[1]) + + +def fetch_well_known(host): + """Print the /.well-known/databricks-config response so we see the metadata the SDK consumes.""" + try: + r = urllib.request.urlopen(f"https://{host}/.well-known/databricks-config", timeout=10) + body = r.read().decode("utf-8") + return json.loads(body) + except Exception as e: + return {"error": str(e)} + + +def capture(label, host): + print("=" * 80) + print(f"{label}: {host}") + print("=" * 80) + + print("\n/.well-known/databricks-config:") + print(json.dumps(fetch_well_known(host), indent=2)) + + cfg = Config(host=f"https://{host}", client_id=CLIENT_ID, client_secret=CLIENT_SECRET) + provider = oauth_service_principal(cfg) + headers = provider() + authz = headers.get("Authorization", "") + if not authz.startswith("Bearer "): + print(f"\nNo bearer token in headers: {headers!r}") + return + token = authz[len("Bearer ") :] + + print(f"\nAccess token (first 32 chars): {token[:32]}...") + hdr, payload = decode_jwt(token) + print("\nJWT header:") + print(json.dumps(hdr, indent=2)) + print("\nJWT payload (claims):") + print(json.dumps(payload, indent=2)) + print() + + +capture("Staging SPOG", "dogfood-spog.staging.azuredatabricks.net") +capture("Staging Legacy", "adb-7064161269814046.2.staging.azuredatabricks.net") diff --git a/examples/capture_sdk_m2m_trace.py b/examples/capture_sdk_m2m_trace.py new file mode 100644 index 000000000..32d4f7bf6 --- /dev/null +++ b/examples/capture_sdk_m2m_trace.py @@ -0,0 +1,80 @@ +"""Trace every HTTP request the SDK makes while fetching an M2M token on Staging SPOG. + +Monkey-patches urllib3 to log outbound requests + response status, so we see +the exact endpoint chain the SDK walks through to produce the captured token. +""" +import json +import logging +import os +import sys +import urllib.request + +from databricks.sdk.core import Config, oauth_service_principal + +# Enable verbose SDK logging +logging.basicConfig(level=logging.DEBUG, format="%(asctime)s %(name)s %(levelname)s %(message)s") +for name in ["databricks.sdk", "urllib3"]: + logging.getLogger(name).setLevel(logging.DEBUG) + +# Intercept HTTP calls with an adapter-level hook +import requests +_orig_send = requests.adapters.HTTPAdapter.send + +call_num = [0] +def traced_send(self, req, **kw): + call_num[0] += 1 + body_preview = "" + if req.body: + try: + body = req.body if isinstance(req.body, str) else req.body.decode("utf-8", errors="replace") + # Redact secrets so this output is pasteable + for secret_key in ("client_secret=", "assertion="): + if secret_key in body: + idx = body.find(secret_key) + len(secret_key) + amp = body.find("&", idx) + end = amp if amp >= 0 else len(body) + body = body[:idx] + "***REDACTED***" + body[end:] + body_preview = f"\n body: {body}" + except Exception: + body_preview = f"\n body: " + print(f"\n[{call_num[0]}] → {req.method} {req.url}{body_preview}") + resp = _orig_send(self, req, **kw) + print(f" ← {resp.status_code} {resp.reason}") + ctype = resp.headers.get("content-type", "") + if "json" in ctype and resp.content: + try: + body = json.loads(resp.content) + # Truncate long values + for k, v in list(body.items()): + if isinstance(v, str) and len(v) > 120: + body[k] = v[:120] + "..." + if isinstance(v, list) and len(v) > 8: + body[k] = v[:8] + ["..."] + print(f" response JSON: {json.dumps(body, indent=6)[:1400]}") + except Exception: + pass + return resp + +requests.adapters.HTTPAdapter.send = traced_send + + +CLIENT_ID = os.getenv("DATABRICKS_DOGFOOD_AZURE_CLIENT_ID") +CLIENT_SECRET = os.getenv("DATABRICKS_DOGFOOD_AZURE_CLIENT_SECRET") +if not (CLIENT_ID and CLIENT_SECRET): + print("Missing creds") + sys.exit(1) + +HOST = sys.argv[1] if len(sys.argv) > 1 else "dogfood-spog.staging.azuredatabricks.net" + +print(f"\n======== Tracing SDK M2M flow against: {HOST} ========\n") + +cfg = Config(host=f"https://{HOST}", client_id=CLIENT_ID, client_secret=CLIENT_SECRET) +print("\n>>> Config constructed, now calling oauth_service_principal()\n") +provider = oauth_service_principal(cfg) +print("\n>>> Provider created, now calling it to get headers\n") +headers = provider() +print("\n>>> Got headers (Authorization redacted in summary below)\n") + +# Print the final Authorization header preview +auth = headers.get("Authorization", "") +print(f"Final Authorization: {auth[:32]}...{auth[-16:]}") diff --git a/examples/spog_all_auth_test.py b/examples/spog_all_auth_test.py new file mode 100644 index 000000000..b6b9e3088 --- /dev/null +++ b/examples/spog_all_auth_test.py @@ -0,0 +1,435 @@ +""" +Python SQL Connector — Complete Auth Flow Test (M2M + U2M) + +Tests every auth path the Python connector supports, on both SPOG and legacy workspaces. + +M2M Paths: + 1. SDK M2M via credentials_provider (documented path) + → uses databricks-sdk oauth_service_principal + → sends to {host}/oidc/v1/token (Databricks OIDC) + → wrapped by TokenFederationProvider + + 2. auth_type="azure-sp-m2m" with explicit azure_tenant_id (undocumented) + → connector's own AzureServicePrincipalCredentialProvider + → sends to login.microsoftonline.com/{tenant}/oauth2/token + → wrapped by TokenFederationProvider + + 3. auth_type="azure-sp-m2m" without azure_tenant_id (auto-discover) + → connector calls GET {host}/aad/auth → extracts tenant from 302 redirect + → then same as path 2 + +U2M Paths: + 4. auth_type="databricks-oauth" (documented for non-Azure) + → connector's DatabricksOAuthProvider with InHouseOAuthEndpointCollection + → opens browser to {host}/oidc/oauth2/v2.0/authorize + + 5. auth_type="azure-oauth" (Azure-specific U2M) + → connector's DatabricksOAuthProvider with AzureOAuthEndpointCollection + → opens browser to {host}/oidc/oauth2/v2.0/authorize with Azure scopes + + 6. No auth params at all (default — falls through to U2M) + → same as path 4 if oauth_client_id and redirect_port are configured + +Workspaces: + Staging SPOG: dogfood-spog.staging.azuredatabricks.net + Staging Legacy: adb-7064161269814046.2.staging.azuredatabricks.net + Prod Legacy: adb-6436897454825492.12.azuredatabricks.net + +Usage: + source ~/.zshrc + cd databricks-sql-python + PYTHONPATH=src .venv/bin/python examples/spog_all_auth_test.py +""" + +import json +import logging +import os +import sys +import time +from typing import Optional + +from databricks import sql +from databricks.sdk.core import oauth_service_principal, Config + +# Enable debug logging to capture auth flow details +LOG_FILE = "/tmp/python_spog_auth_debug.log" +logging.basicConfig( + level=logging.DEBUG, + format="%(asctime)s %(levelname)-5s %(name)s: %(message)s", + handlers=[ + logging.FileHandler(LOG_FILE, mode="w"), + ], +) +# Suppress noisy libraries but keep auth-related ones +for name in ["urllib3", "urllib3.connectionpool", "urllib3.response", "thrift", "pyarrow"]: + logging.getLogger(name).setLevel(logging.WARNING) + +logger = logging.getLogger(__name__) + +# ============================================================================ +# Configuration +# ============================================================================ + +STG_SPOG = "dogfood-spog.staging.azuredatabricks.net" +STG_LEGACY = "adb-7064161269814046.2.staging.azuredatabricks.net" +STG_WH = "/sql/1.0/warehouses/e256699345d1ac74" +STG_SPOG_PATH = STG_WH + "?o=7064161269814046" +STG_TENANT = "e3fe3f22-4b98-4c04-82cc-d8817d1b17da" + +PROD_SPOG = "peco.azuredatabricks.net" +PROD_LEGACY = "adb-6436897454825492.12.azuredatabricks.net" +PROD_WH = "/sql/1.0/warehouses/00adc7b6c00429b8" +PROD_SPOG_PATH = PROD_WH + "?o=6436897454825492" +PROD_TENANT = "9f37a392-f0ae-4280-9796-f1864a10effc" + +# Credentials +STG_PAT = os.getenv("DATABRICKS_DOGFOOD_WESTUS_STAGING_TOKEN") + +# Staging: Databricks managed SP (dose secret) — for SDK M2M +STG_M2M_ID = os.getenv("DATABRICKS_DOGFOOD_AZURE_CLIENT_ID") # dc8dd813 +STG_M2M_SEC = os.getenv("DATABRICKS_DOGFOOD_AZURE_CLIENT_SECRET") # dose... + +# Staging: Azure AD app (Azure Portal secret) — for azure-sp-m2m +STG_ENTRA_ID = os.getenv("DATABRICKS_SPOG_ENTRA_TEST_CLIENT_ID") # d7f11108 +STG_ENTRA_SEC = os.getenv("DATABRICKS_SPOG_ENTRA_TEST_CLIENT_SECRET") # eAA8... + +# Prod +PROD_PAT = os.getenv("DATABRICKS_PECOTESTING_TOKEN_PERSONAL") + +# Prod: Entra managed SP (dose secret) — for SDK M2M +PROD_M2M_ID = os.getenv("DATABRICKS_PECOTESTING_DATABRICKS_CLIENT_ID_MSR_SPN") # a6f72159 +PROD_M2M_SEC = os.getenv("DATABRICKS_PECOTESTING_DATABRICKS_CLIENT_SECRET_MSR_SPN") + +# Prod: Azure AD app (Azure Portal secret) — for azure-sp-m2m +PROD_ENTRA_ID = os.getenv("DATABRICKS_AAD_CLIENT_ID") # d154b9ed +PROD_ENTRA_SEC = os.getenv("DATABRICKS_AAD_CLIENT_SECRET") # US28... + +U2M_ENABLED = os.getenv("DATABRICKS_SPOG_U2M_ENABLED") == "true" + +# ============================================================================ +# Test infrastructure +# ============================================================================ + +results = [] +COL = 65 + + +def record(name, status, elapsed=0, error=None): + results.append((name, status, elapsed, error)) + if sys.stdout.isatty(): + tag = {"PASS": "\033[32mPASS\033[0m", "FAIL": "\033[31mFAIL\033[0m", + "SKIP": "\033[33mSKIP\033[0m"}.get(status, status) + else: + tag = status + t = f"({elapsed:.1f}s)" if elapsed > 0 else "" + print(f" {name:<{COL}}{tag} {t}") + if error: + print(f" -> {str(error)[:300]}") + logger.info(f"TEST {status}: {name} {f'error={error}' if error else ''}") + + +def run(name, **kw): + """Connect, SELECT 1, verify, close.""" + logger.info(f"=== START TEST: {name} ===") + logger.info(f" connect kwargs: { {k: ('***' if 'secret' in k.lower() or 'token' in k.lower() else v) for k,v in kw.items()} }") + t0 = time.time() + try: + conn = sql.connect(**kw) + cur = conn.cursor() + cur.execute("SELECT 1") + row = cur.fetchone() + cur.close() + conn.close() + el = time.time() - t0 + if row and row[0] == 1: + record(name, "PASS", el) + else: + record(name, "FAIL", el, f"Expected 1, got {row}") + except Exception as e: + el = time.time() - t0 + record(name, "FAIL", el, f"{type(e).__name__}: {e}") + logger.info(f"=== END TEST: {name} ({time.time()-t0:.1f}s) ===\n") + + +def skip(name, reason): + record(name, "SKIP", error=reason) + + +def section(title): + print(f"\n{'='*80}") + print(f" {title}") + print(f"{'='*80}") + logger.info(f"\n{'='*60}\n {title}\n{'='*60}") + + +def make_sdk_provider(host, cid, csec): + """Build credentials_provider using databricks-sdk oauth_service_principal. + This is the documented M2M path from: + https://docs.databricks.com/en/dev-tools/python-sql-connector.html + and examples/m2m_oauth.py + """ + def credential_provider(): + config = Config(host=f"https://{host}", client_id=cid, client_secret=csec) + return oauth_service_principal(config) + return credential_provider + + +# ============================================================================ +# M2M Tests +# ============================================================================ + +def test_m2m_path1_sdk(): + """Path 1: SDK M2M via credentials_provider (documented). + Code: auth.py:18-19 → ExternalAuthProvider(credentials_provider) + Token from: SDK → OIDC discovery → {host}/oidc/v1/token + """ + section("M2M Path 1: SDK credentials_provider (documented)") + print(" Code: databricks.sdk.core.oauth_service_principal → {host}/oidc/v1/token") + print() + + # Staging + if STG_M2M_ID and STG_M2M_SEC: + run("Path1 SDK M2M (dc8dd813) | Stg SPOG", + server_hostname=STG_SPOG, http_path=STG_SPOG_PATH, + credentials_provider=make_sdk_provider(STG_SPOG, STG_M2M_ID, STG_M2M_SEC)) + run("Path1 SDK M2M (dc8dd813) | Stg Legacy", + server_hostname=STG_LEGACY, http_path=STG_WH, + credentials_provider=make_sdk_provider(STG_LEGACY, STG_M2M_ID, STG_M2M_SEC)) + else: + skip("Path1 SDK M2M | Stg", "DATABRICKS_DOGFOOD_AZURE_CLIENT_ID/SECRET not set") + + # Prod + if PROD_M2M_ID and PROD_M2M_SEC: + run("Path1 SDK M2M (a6f72159) | Prod Legacy", + server_hostname=PROD_LEGACY, http_path=PROD_WH, + credentials_provider=make_sdk_provider(PROD_LEGACY, PROD_M2M_ID, PROD_M2M_SEC)) + else: + skip("Path1 SDK M2M | Prod", "DATABRICKS_PECOTESTING_DATABRICKS_CLIENT_ID_MSR_SPN not set") + + +def test_m2m_path2_azure_sp_explicit(): + """Path 2: auth_type='azure-sp-m2m' with explicit tenant (undocumented). + Code: auth.py:20-30 → AzureServicePrincipalCredentialProvider + Token from: login.microsoftonline.com/{tenant}/oauth2/token + """ + section("M2M Path 2: azure-sp-m2m + explicit tenant (undocumented)") + print(" Code: authenticators.py:161 AzureServicePrincipalCredentialProvider") + print(" Token from: login.microsoftonline.com/{tenant}/oauth2/token") + print() + + # Staging: d7f11108 in staging tenant + if STG_ENTRA_ID and STG_ENTRA_SEC: + run("Path2 AzureSP (d7f11108+tenant) | Stg SPOG", + server_hostname=STG_SPOG, http_path=STG_SPOG_PATH, + auth_type="azure-sp-m2m", + azure_client_id=STG_ENTRA_ID, azure_client_secret=STG_ENTRA_SEC, + azure_tenant_id=STG_TENANT) + run("Path2 AzureSP (d7f11108+tenant) | Stg Legacy", + server_hostname=STG_LEGACY, http_path=STG_WH, + auth_type="azure-sp-m2m", + azure_client_id=STG_ENTRA_ID, azure_client_secret=STG_ENTRA_SEC, + azure_tenant_id=STG_TENANT) + else: + skip("Path2 AzureSP | Stg", "DATABRICKS_SPOG_ENTRA_TEST not set") + + # Prod: d154b9ed in prod tenant + if PROD_ENTRA_ID and PROD_ENTRA_SEC: + run("Path2 AzureSP (d154b9ed+tenant) | Prod Legacy", + server_hostname=PROD_LEGACY, http_path=PROD_WH, + auth_type="azure-sp-m2m", + azure_client_id=PROD_ENTRA_ID, azure_client_secret=PROD_ENTRA_SEC, + azure_tenant_id=PROD_TENANT) + run("Path2 AzureSP (d154b9ed+tenant) | Prod SPOG", + server_hostname=PROD_SPOG, http_path=PROD_SPOG_PATH, + auth_type="azure-sp-m2m", + azure_client_id=PROD_ENTRA_ID, azure_client_secret=PROD_ENTRA_SEC, + azure_tenant_id=PROD_TENANT) + else: + skip("Path2 AzureSP | Prod", "DATABRICKS_AAD_CLIENT not set") + + +def test_m2m_path3_azure_sp_auto(): + """Path 3: auth_type='azure-sp-m2m' without tenant (auto-discover). + Code: authenticators.py:201 → get_azure_tenant_id_from_host(hostname) + common.py:106 → GET {host}/aad/auth → extracts tenant from 302 redirect + """ + section("M2M Path 3: azure-sp-m2m + auto-discover tenant (undocumented)") + print(" Code: common.py:106 get_azure_tenant_id_from_host → GET {host}/aad/auth") + print() + + # Staging + if STG_ENTRA_ID and STG_ENTRA_SEC: + run("Path3 AzureSP auto-tenant (d7f11108) | Stg SPOG", + server_hostname=STG_SPOG, http_path=STG_SPOG_PATH, + auth_type="azure-sp-m2m", + azure_client_id=STG_ENTRA_ID, azure_client_secret=STG_ENTRA_SEC) + run("Path3 AzureSP auto-tenant (d7f11108) | Stg Legacy", + server_hostname=STG_LEGACY, http_path=STG_WH, + auth_type="azure-sp-m2m", + azure_client_id=STG_ENTRA_ID, azure_client_secret=STG_ENTRA_SEC) + else: + skip("Path3 AzureSP auto | Stg", "DATABRICKS_SPOG_ENTRA_TEST not set") + + # Prod + if PROD_ENTRA_ID and PROD_ENTRA_SEC: + run("Path3 AzureSP auto-tenant (d154b9ed) | Prod Legacy", + server_hostname=PROD_LEGACY, http_path=PROD_WH, + auth_type="azure-sp-m2m", + azure_client_id=PROD_ENTRA_ID, azure_client_secret=PROD_ENTRA_SEC) + run("Path3 AzureSP auto-tenant (d154b9ed) | Prod SPOG", + server_hostname=PROD_SPOG, http_path=PROD_SPOG_PATH, + auth_type="azure-sp-m2m", + azure_client_id=PROD_ENTRA_ID, azure_client_secret=PROD_ENTRA_SEC) + else: + skip("Path3 AzureSP auto | Prod", "DATABRICKS_AAD_CLIENT not set") + + +# ============================================================================ +# U2M Tests +# ============================================================================ + +def test_u2m_path4_databricks_oauth(): + """Path 4: auth_type='databricks-oauth' (documented U2M). + Code: auth.py:31-44 → DatabricksOAuthProvider + endpoint.py: InHouseOAuthEndpointCollection (on Azure .azuredatabricks.net) + Opens browser to {host}/oidc/oauth2/v2.0/authorize + Scopes: ["sql", "offline_access"] + """ + section("U2M Path 4: auth_type='databricks-oauth' (documented)") + print(" Code: authenticators.py:56 DatabricksOAuthProvider → browser flow") + print(" Scopes: sql offline_access") + print() + + if not U2M_ENABLED: + skip("Path4 U2M databricks-oauth | Stg SPOG", "DATABRICKS_SPOG_U2M_ENABLED != true") + skip("Path4 U2M databricks-oauth | Stg Legacy", "DATABRICKS_SPOG_U2M_ENABLED != true") + return + + run("Path4 U2M databricks-oauth | Stg SPOG", + server_hostname=STG_SPOG, http_path=STG_SPOG_PATH, + auth_type="databricks-oauth") + run("Path4 U2M databricks-oauth | Stg Legacy", + server_hostname=STG_LEGACY, http_path=STG_WH, + auth_type="databricks-oauth") + + +def test_u2m_path5_azure_oauth(): + """Path 5: auth_type='azure-oauth' (Azure U2M). + Code: auth.py:31-44 → DatabricksOAuthProvider + endpoint.py: AzureOAuthEndpointCollection + Opens browser to {host}/oidc/oauth2/v2.0/authorize + Scopes: ["2ff814a6.../user_impersonation", "offline_access"] + """ + section("U2M Path 5: auth_type='azure-oauth'") + print(" Code: authenticators.py:56 DatabricksOAuthProvider → browser flow") + print(" Scopes: 2ff814a6.../user_impersonation offline_access") + print() + + if not U2M_ENABLED: + skip("Path5 U2M azure-oauth | Stg SPOG", "DATABRICKS_SPOG_U2M_ENABLED != true") + skip("Path5 U2M azure-oauth | Stg Legacy", "DATABRICKS_SPOG_U2M_ENABLED != true") + return + + run("Path5 U2M azure-oauth | Stg SPOG", + server_hostname=STG_SPOG, http_path=STG_SPOG_PATH, + auth_type="azure-oauth") + run("Path5 U2M azure-oauth | Stg Legacy", + server_hostname=STG_LEGACY, http_path=STG_WH, + auth_type="azure-oauth") + + +def test_u2m_path6_default(): + """Path 6: No auth params — connector falls through to U2M. + Code: auth.py:51-64 → DatabricksOAuthProvider (fallback) + Only if oauth_client_id and oauth_redirect_port_range are configured. + In practice, this opens a browser (same as databricks-oauth). + """ + section("U2M Path 6: No auth params (default fallback)") + print(" Code: auth.py:51-64 → falls through to DatabricksOAuthProvider") + print() + + if not U2M_ENABLED: + skip("Path6 U2M default | Stg Legacy", "DATABRICKS_SPOG_U2M_ENABLED != true") + return + + run("Path6 U2M default | Stg Legacy", + server_hostname=STG_LEGACY, http_path=STG_WH) + + +# ============================================================================ +# PAT (baseline) +# ============================================================================ + +def test_pat(): + section("PAT (baseline)") + + if STG_PAT: + run("PAT | Stg SPOG", + server_hostname=STG_SPOG, http_path=STG_SPOG_PATH, access_token=STG_PAT) + run("PAT | Stg Legacy", + server_hostname=STG_LEGACY, http_path=STG_WH, access_token=STG_PAT) + else: + skip("PAT | Stg", "DATABRICKS_DOGFOOD_WESTUS_STAGING_TOKEN not set") + + if PROD_PAT: + run("PAT | Prod Legacy", + server_hostname=PROD_LEGACY, http_path=PROD_WH, access_token=PROD_PAT) + run("PAT | Prod SPOG", + server_hostname=PROD_SPOG, http_path=PROD_SPOG_PATH, access_token=PROD_PAT) + else: + skip("PAT | Prod", "DATABRICKS_PECOTESTING_TOKEN_PERSONAL not set") + + +# ============================================================================ +# Main +# ============================================================================ + +def main(): + print() + print("=" * 80) + print(" Python SQL Connector — Complete Auth Flow Test") + print(f" Debug log: {LOG_FILE}") + print("=" * 80) + print() + print(" Credentials:") + print(f" STG PAT ................. {'YES' if STG_PAT else 'NO'}") + print(f" STG SDK M2M (dc8dd813) .. {'YES' if STG_M2M_ID else 'NO'} (dose → Databricks OIDC)") + print(f" STG Entra (d7f11108) .... {'YES' if STG_ENTRA_ID else 'NO'} (Azure secret → Azure AD)") + print(f" PROD PAT ................ {'YES' if PROD_PAT else 'NO'}") + print(f" PROD SDK M2M (a6f72159) . {'YES' if PROD_M2M_ID else 'NO'} (dose → Databricks OIDC)") + print(f" PROD Entra (d154b9ed) ... {'YES' if PROD_ENTRA_ID else 'NO'} (Azure secret → Azure AD)") + print(f" U2M (browser) ........... {'YES' if U2M_ENABLED else 'NO (set DATABRICKS_SPOG_U2M_ENABLED=true)'}") + + # Run all tests + test_pat() + test_m2m_path1_sdk() + test_m2m_path2_azure_sp_explicit() + test_m2m_path3_azure_sp_auto() + test_u2m_path4_databricks_oauth() + test_u2m_path5_azure_oauth() + test_u2m_path6_default() + + # Summary + print(f"\n{'='*80}") + print(" SUMMARY") + print(f"{'='*80}") + p = sum(1 for _, s, _, _ in results if s == "PASS") + f = sum(1 for _, s, _, _ in results if s == "FAIL") + sk = sum(1 for _, s, _, _ in results if s == "SKIP") + print(f" Total: {len(results)} | PASS: {p} | FAIL: {f} | SKIP: {sk}") + + if f: + print(f"\n FAILURES:") + for n, s, _, e in results: + if s == "FAIL": + print(f" - {n}") + if e: + print(f" {str(e)[:250]}") + + print(f"\n Debug log: {LOG_FILE}") + print() + sys.exit(1 if f else 0) + + +if __name__ == "__main__": + main() diff --git a/examples/spog_full_matrix.py b/examples/spog_full_matrix.py new file mode 100644 index 000000000..6fb8dbd83 --- /dev/null +++ b/examples/spog_full_matrix.py @@ -0,0 +1,153 @@ +""" +Python SQL Connector — Full SPOG Matrix Test + +Tests all auth flows on BOTH staging and prod workspaces with correct credentials. + +Usage: + source ~/.zshrc + cd databricks-sql-python + PYTHONPATH=src .venv/bin/python examples/spog_full_matrix.py +""" + +import os, sys, time +from databricks import sql +from databricks.sdk.core import oauth_service_principal, Config + +# === Workspaces === +STG_SPOG = "dogfood-spog.staging.azuredatabricks.net" +STG_LEGACY = "adb-7064161269814046.2.staging.azuredatabricks.net" +STG_WH = "/sql/1.0/warehouses/e256699345d1ac74" +STG_SPOG_PATH = STG_WH + "?o=7064161269814046" +STG_TENANT = "e3fe3f22-4b98-4c04-82cc-d8817d1b17da" + +PROD_SPOG = "peco.azuredatabricks.net" +PROD_LEGACY = "adb-6436897454825492.12.azuredatabricks.net" +PROD_WH = "/sql/1.0/warehouses/00adc7b6c00429b8" +PROD_SPOG_PATH = PROD_WH + "?o=6436897454825492" +PROD_TENANT = "9f37a392-f0ae-4280-9796-f1864a10effc" + +# === Credentials === +STG_PAT = os.getenv("DATABRICKS_DOGFOOD_WESTUS_STAGING_TOKEN") +STG_M2M_ID = os.getenv("DATABRICKS_DOGFOOD_AZURE_CLIENT_ID") +STG_M2M_SEC = os.getenv("DATABRICKS_DOGFOOD_AZURE_CLIENT_SECRET") +STG_ENTRA_ID = os.getenv("DATABRICKS_SPOG_ENTRA_TEST_CLIENT_ID") # d7f11108, staging tenant +STG_ENTRA_SEC = os.getenv("DATABRICKS_SPOG_ENTRA_TEST_CLIENT_SECRET") + +PROD_PAT = os.getenv("DATABRICKS_PECOTESTING_TOKEN_PERSONAL") +PROD_M2M_ID = os.getenv("DATABRICKS_PECOTESTING_DATABRICKS_CLIENT_ID_MSR_SPN") # a6f72159 +PROD_M2M_SEC = os.getenv("DATABRICKS_PECOTESTING_DATABRICKS_CLIENT_SECRET_MSR_SPN") +PROD_ENTRA_ID = os.getenv("DATABRICKS_AAD_CLIENT_ID") # d154b9ed, prod tenant +PROD_ENTRA_SEC = os.getenv("DATABRICKS_AAD_CLIENT_SECRET") + +results = [] +COL = 65 + +def record(name, status, elapsed=0, error=None): + results.append((name, status, elapsed, error)) + tag = {"PASS": "\033[32mPASS\033[0m", "FAIL": "\033[31mFAIL\033[0m", "SKIP": "\033[33mSKIP\033[0m"}.get(status, status) + t = f"({elapsed:.1f}s)" if elapsed > 0 else "" + print(f" {name:<{COL}}{tag} {t}") + if error: print(f" -> {str(error)[:250]}") + +def run(name, **kw): + t0 = time.time() + try: + conn = sql.connect(**kw) + cur = conn.cursor(); cur.execute("SELECT 1"); row = cur.fetchone(); cur.close(); conn.close() + el = time.time() - t0 + record(name, "PASS" if row and row[0] == 1 else "FAIL", el, None if row and row[0] == 1 else f"got {row}") + except Exception as e: + record(name, "FAIL", time.time() - t0, f"{type(e).__name__}: {e}") + +def skip(name, reason): record(name, "SKIP", error=reason) + +def section(t): print(f"\n{'='*80}\n {t}\n{'='*80}") + +def make_sdk_provider(host, cid, csec): + def p(): + return oauth_service_principal(Config(host=f"https://{host}", client_id=cid, client_secret=csec)) + return p + +# ============================================================ +# STAGING +# ============================================================ +def test_staging(): + section("STAGING SPOG — dogfood-spog.staging.azuredatabricks.net") + print(f" Tenant: {STG_TENANT}\n") + + # PAT + if STG_PAT: + run("Stg | PAT | SPOG", server_hostname=STG_SPOG, http_path=STG_SPOG_PATH, access_token=STG_PAT) + run("Stg | PAT | Legacy", server_hostname=STG_LEGACY, http_path=STG_WH, access_token=STG_PAT) + else: skip("Stg | PAT", "no token") + + # SDK M2M (Databricks OIDC) + if STG_M2M_ID and STG_M2M_SEC: + run("Stg | SDK M2M (dc8dd813) | SPOG", server_hostname=STG_SPOG, http_path=STG_SPOG_PATH, + credentials_provider=make_sdk_provider(STG_SPOG, STG_M2M_ID, STG_M2M_SEC)) + run("Stg | SDK M2M (dc8dd813) | Legacy", server_hostname=STG_LEGACY, http_path=STG_WH, + credentials_provider=make_sdk_provider(STG_LEGACY, STG_M2M_ID, STG_M2M_SEC)) + else: skip("Stg | SDK M2M", "no creds") + + # Azure AD M2M (d7f11108, staging tenant) + if STG_ENTRA_ID and STG_ENTRA_SEC: + run("Stg | Azure AD (d7f11108, explicit tenant) | SPOG", server_hostname=STG_SPOG, http_path=STG_SPOG_PATH, + auth_type="azure-sp-m2m", azure_client_id=STG_ENTRA_ID, azure_client_secret=STG_ENTRA_SEC, azure_tenant_id=STG_TENANT) + run("Stg | Azure AD (d7f11108, explicit tenant) | Legacy", server_hostname=STG_LEGACY, http_path=STG_WH, + auth_type="azure-sp-m2m", azure_client_id=STG_ENTRA_ID, azure_client_secret=STG_ENTRA_SEC, azure_tenant_id=STG_TENANT) + run("Stg | Azure AD (d7f11108, auto-tenant) | Legacy", server_hostname=STG_LEGACY, http_path=STG_WH, + auth_type="azure-sp-m2m", azure_client_id=STG_ENTRA_ID, azure_client_secret=STG_ENTRA_SEC) + else: skip("Stg | Azure AD", "no creds") + +# ============================================================ +# PROD +# ============================================================ +def test_prod(): + section("PROD — peco.azuredatabricks.net") + print(f" Tenant: {PROD_TENANT}\n") + + # PAT + if PROD_PAT: + run("Prod | PAT | Legacy", server_hostname=PROD_LEGACY, http_path=PROD_WH, access_token=PROD_PAT) + else: skip("Prod | PAT", "no token") + + # SDK M2M (a6f72159, Databricks OIDC) + if PROD_M2M_ID and PROD_M2M_SEC: + run("Prod | SDK M2M (a6f72159) | Legacy", server_hostname=PROD_LEGACY, http_path=PROD_WH, + credentials_provider=make_sdk_provider(PROD_LEGACY, PROD_M2M_ID, PROD_M2M_SEC)) + else: skip("Prod | SDK M2M", "no creds") + + # Azure AD M2M (d154b9ed, prod tenant) + if PROD_ENTRA_ID and PROD_ENTRA_SEC: + run("Prod | Azure AD (d154b9ed, explicit tenant) | Legacy", server_hostname=PROD_LEGACY, http_path=PROD_WH, + auth_type="azure-sp-m2m", azure_client_id=PROD_ENTRA_ID, azure_client_secret=PROD_ENTRA_SEC, azure_tenant_id=PROD_TENANT) + run("Prod | Azure AD (d154b9ed, auto-tenant) | Legacy", server_hostname=PROD_LEGACY, http_path=PROD_WH, + auth_type="azure-sp-m2m", azure_client_id=PROD_ENTRA_ID, azure_client_secret=PROD_ENTRA_SEC) + else: skip("Prod | Azure AD", "no creds") + + # SDK M2M on prod with Entra managed SP dose secret (a6f72159) — via credentials_provider + if PROD_M2M_ID and PROD_M2M_SEC: + run("Prod | SDK M2M (a6f72159, dose) via provider | Legacy", server_hostname=PROD_LEGACY, http_path=PROD_WH, + credentials_provider=make_sdk_provider(PROD_LEGACY, PROD_M2M_ID, PROD_M2M_SEC)) + else: skip("Prod | SDK M2M provider", "no creds") + +# ============================================================ +def main(): + print("\n" + "="*80) + print(" Python SQL Connector — Full SPOG Matrix") + print("="*80) + test_staging() + test_prod() + print(f"\n{'='*80}\n SUMMARY\n{'='*80}") + p = sum(1 for _,s,_,_ in results if s=="PASS") + f = sum(1 for _,s,_,_ in results if s=="FAIL") + sk = sum(1 for _,s,_,_ in results if s=="SKIP") + print(f" Total: {len(results)} | PASS: {p} | FAIL: {f} | SKIP: {sk}") + if f: + print(f"\n FAILURES:") + for n,s,_,e in results: + if s=="FAIL": print(f" - {n}"); e and print(f" {str(e)[:200]}") + print() + sys.exit(1 if f else 0) + +if __name__ == "__main__": main() diff --git a/examples/spog_full_test.py b/examples/spog_full_test.py new file mode 100644 index 000000000..b2f0a4600 --- /dev/null +++ b/examples/spog_full_test.py @@ -0,0 +1,409 @@ +""" +Full SPOG (Single Panel of Glass) E2E test suite for Databricks SQL Python Connector. + +Targets an Azure production workspace. + +Tests: + 1. PAT authentication (Thrift + SEA) + 2. Databricks M2M OAuth (Thrift + SEA) + 3. U2M browser OAuth (skip if env not set) + 4. Azure Entra ID M2M (always SKIP) + 5. Telemetry endpoint (direct HTTP POST, verify 200) + 6. Feature flags endpoint (direct HTTP GET, verify 200) + +Workspace: + Host: adb-2220717038207243.3.azuredatabricks.net + Warehouse: /sql/1.0/warehouses/f45eefbaf9e766bd + Workspace ID: 2220717038207243 + +Env vars: + DATABRICKS_DOGFOOD_SPOG_TOKEN -- PAT + DATABRICKS_DOGFOOD_SPOG_CLIENT_ID -- Databricks M2M client ID + DATABRICKS_DOGFOOD_SPOG_CLIENT_SECRET -- Databricks M2M client secret + DATABRICKS_SPOG_U2M_ENABLED=true -- Enable U2M browser tests + +Usage: + cd /path/to/databricks-sql-python + PYTHONPATH=src .venv/bin/python examples/spog_full_test.py +""" + +import json +import os +import sys +import time +import urllib3 + +from databricks import sql + +# --------------------------------------------------------------------------- +# Configuration +# --------------------------------------------------------------------------- + +HOST = "adb-2220717038207243.3.azuredatabricks.net" +WAREHOUSE_PATH = "/sql/1.0/warehouses/f45eefbaf9e766bd" +WORKSPACE_ID = "2220717038207243" + +FF_DRIVER = "PYTHON" +FF_VERSION = "4.2.5" + +# Credentials from env +PAT = os.getenv("DATABRICKS_DOGFOOD_SPOG_TOKEN") + +M2M_CLIENT_ID = os.getenv("DATABRICKS_DOGFOOD_SPOG_CLIENT_ID") +M2M_CLIENT_SECRET = os.getenv("DATABRICKS_DOGFOOD_SPOG_CLIENT_SECRET") + +U2M_ENABLED = os.getenv("DATABRICKS_SPOG_U2M_ENABLED") == "true" + +# Counters +passed = 0 +failed = 0 +skipped = 0 +results = [] + +# Disable urllib3 SSL warnings +urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) + +# --------------------------------------------------------------------------- +# Workspace-level OIDC credentials provider (no databricks-sdk) +# --------------------------------------------------------------------------- + + +def _workspace_oidc_token(host, client_id, client_secret): + """ + Fetch an OAuth token from the workspace-level OIDC endpoint. + POST https://{host}/oidc/v1/token with client_credentials grant. + Returns the access_token string. + """ + url = f"https://{host}/oidc/v1/token" + http = urllib3.PoolManager() + resp = http.request( + "POST", + url, + headers={"Content-Type": "application/x-www-form-urlencoded"}, + body=( + f"grant_type=client_credentials" + f"&client_id={client_id}" + f"&client_secret={client_secret}" + f"&scope=all-apis" + ), + ) + if resp.status != 200: + raise RuntimeError( + f"OIDC token request to {url} failed with status {resp.status}: " + f"{resp.data.decode('utf-8', errors='replace')[:300]}" + ) + data = json.loads(resp.data.decode("utf-8")) + return data["access_token"] + + +def make_databricks_m2m_provider(host, client_id, client_secret): + """ + Build a credentials_provider callable for the Python SQL connector. + The connector calls credentials_provider() to get a HeaderFactory, + and then calls HeaderFactory() to get a dict of HTTP headers. + """ + + def credentials_provider(): + token = _workspace_oidc_token(host, client_id, client_secret) + + def header_factory(): + return {"Authorization": f"Bearer {token}"} + + return header_factory + + return credentials_provider + + +# --------------------------------------------------------------------------- +# Test runner helpers +# --------------------------------------------------------------------------- + +COL_WIDTH = 60 + + +def _record(name, status, elapsed, error=None): + global passed, failed, skipped + if status == "PASS": + passed += 1 + elif status == "FAIL": + failed += 1 + elif status == "SKIP": + skipped += 1 + results.append((name, status, elapsed, error)) + + +def run_test(name, host, http_path, token=None, credentials_provider=None, + use_sea=False, auth_type=None, extra_kwargs=None): + """ + Connect, run SELECT 1, verify result == 1, close. + """ + print(f" {name:<{COL_WIDTH}}", end="", flush=True) + t0 = time.time() + try: + kwargs = { + "server_hostname": host, + "http_path": http_path, + } + if token: + kwargs["access_token"] = token + if credentials_provider: + kwargs["credentials_provider"] = credentials_provider + if use_sea: + kwargs["use_sea"] = True + if auth_type: + kwargs["auth_type"] = auth_type + if extra_kwargs: + kwargs.update(extra_kwargs) + + conn = sql.connect(**kwargs) + cursor = conn.cursor() + cursor.execute("SELECT 1 AS v") + row = cursor.fetchone() + cursor.close() + conn.close() + + elapsed = time.time() - t0 + + if row is None: + print(f"FAIL ({elapsed:.1f}s)") + print(f" -> fetchone() returned None") + _record(name, "FAIL", elapsed, "fetchone() returned None") + elif row[0] != 1: + print(f"FAIL ({elapsed:.1f}s)") + print(f" -> Expected 1, got {row[0]!r}") + _record(name, "FAIL", elapsed, f"Expected 1, got {row[0]!r}") + else: + print(f"PASS ({elapsed:.1f}s)") + _record(name, "PASS", elapsed) + + except Exception as e: + elapsed = time.time() - t0 + err_msg = f"{type(e).__name__}: {str(e)[:200]}" + print(f"FAIL ({elapsed:.1f}s)") + print(f" -> {err_msg}") + _record(name, "FAIL", elapsed, err_msg) + + +def skip_test(name, reason): + print(f" {name:<{COL_WIDTH}}SKIP ({reason})") + _record(name, "SKIP", 0.0, reason) + + +def section(title): + print(f"\n{'=' * 80}") + print(f" {title}") + print(f"{'=' * 80}") + + +# --------------------------------------------------------------------------- +# HTTP endpoint tests +# --------------------------------------------------------------------------- + +def test_telemetry(): + """Direct HTTP POST to /telemetry-ext, verify HTTP 200.""" + section("Telemetry Endpoint") + name = "Telemetry | POST /telemetry-ext" + print(f" {name:<{COL_WIDTH}}", end="", flush=True) + + if not PAT: + print("SKIP (DATABRICKS_DOGFOOD_SPOG_TOKEN not set)") + _record(name, "SKIP", 0.0, "DATABRICKS_DOGFOOD_SPOG_TOKEN not set") + return + + t0 = time.time() + try: + http = urllib3.PoolManager() + resp = http.request( + "POST", + f"https://{HOST}/telemetry-ext", + headers={ + "Authorization": f"Bearer {PAT}", + "Content-Type": "application/json", + }, + body=json.dumps({"items": [], "protoLogs": []}).encode("utf-8"), + timeout=30.0, + ) + elapsed = time.time() - t0 + if resp.status == 200: + print(f"PASS ({elapsed:.1f}s)") + _record(name, "PASS", elapsed) + else: + body = resp.data.decode("utf-8", errors="replace")[:200] + print(f"FAIL ({elapsed:.1f}s)") + print(f" -> HTTP {resp.status}: {body}") + _record(name, "FAIL", elapsed, f"HTTP {resp.status}: {body}") + except Exception as e: + elapsed = time.time() - t0 + err_msg = f"{type(e).__name__}: {str(e)[:200]}" + print(f"FAIL ({elapsed:.1f}s)") + print(f" -> {err_msg}") + _record(name, "FAIL", elapsed, err_msg) + + +def test_feature_flags(): + """Direct HTTP GET to /api/2.0/connector-service/feature-flags/{DRIVER}/{VERSION}, verify HTTP 200.""" + section("Feature Flags Endpoint") + path = f"/api/2.0/connector-service/feature-flags/{FF_DRIVER}/{FF_VERSION}" + name = f"Feature Flags | GET {path}" + print(f" {name:<{COL_WIDTH}}", end="", flush=True) + + if not PAT: + print("SKIP (DATABRICKS_DOGFOOD_SPOG_TOKEN not set)") + _record(name, "SKIP", 0.0, "DATABRICKS_DOGFOOD_SPOG_TOKEN not set") + return + + t0 = time.time() + try: + http = urllib3.PoolManager() + resp = http.request( + "GET", + f"https://{HOST}{path}", + headers={ + "Authorization": f"Bearer {PAT}", + }, + timeout=30.0, + ) + elapsed = time.time() - t0 + if resp.status == 200: + print(f"PASS ({elapsed:.1f}s)") + _record(name, "PASS", elapsed) + else: + body = resp.data.decode("utf-8", errors="replace")[:200] + print(f"FAIL ({elapsed:.1f}s)") + print(f" -> HTTP {resp.status}: {body}") + _record(name, "FAIL", elapsed, f"HTTP {resp.status}: {body}") + except Exception as e: + elapsed = time.time() - t0 + err_msg = f"{type(e).__name__}: {str(e)[:200]}" + print(f"FAIL ({elapsed:.1f}s)") + print(f" -> {err_msg}") + _record(name, "FAIL", elapsed, err_msg) + + +# --------------------------------------------------------------------------- +# Test groups +# --------------------------------------------------------------------------- + +def test_pat(): + """PAT auth, Thrift and SEA.""" + section("PAT Authentication") + if not PAT: + for label in [ + "PAT | Thrift", + "PAT | SEA", + ]: + skip_test(label, "DATABRICKS_DOGFOOD_SPOG_TOKEN not set") + return + + run_test("PAT | Thrift", HOST, WAREHOUSE_PATH, token=PAT) + run_test("PAT | SEA", HOST, WAREHOUSE_PATH, token=PAT, use_sea=True) + + +def test_databricks_m2m(): + """Databricks M2M (workspace OIDC), Thrift and SEA.""" + section("Databricks M2M OAuth") + have_creds = M2M_CLIENT_ID and M2M_CLIENT_SECRET + if not have_creds: + for label in [ + "Databricks M2M | Thrift", + "Databricks M2M | SEA", + ]: + skip_test(label, "DATABRICKS_DOGFOOD_SPOG_CLIENT_ID/SECRET not set") + return + + provider = make_databricks_m2m_provider(HOST, M2M_CLIENT_ID, M2M_CLIENT_SECRET) + + run_test("Databricks M2M | Thrift", HOST, WAREHOUSE_PATH, credentials_provider=provider) + run_test("Databricks M2M | SEA", HOST, WAREHOUSE_PATH, credentials_provider=provider, use_sea=True) + + +def test_u2m(): + """U2M (browser flow), Thrift only.""" + section("U2M (Browser) OAuth") + if not U2M_ENABLED: + skip_test("U2M | Thrift", "DATABRICKS_SPOG_U2M_ENABLED != true") + return + + run_test("U2M | Thrift", HOST, WAREHOUSE_PATH) + + +def test_entra_id_m2m(): + """Azure Entra ID M2M -- always SKIP.""" + section("Azure Entra ID M2M") + skip_test("Entra ID M2M | Thrift", "Entra ID credentials not configured") + skip_test("Entra ID M2M | SEA", "Entra ID credentials not configured") + + +# --------------------------------------------------------------------------- +# Summary +# --------------------------------------------------------------------------- + +def print_summary(): + print(f"\n{'=' * 80}") + print(" SUMMARY") + print(f"{'=' * 80}") + print(f" {'Test':<{COL_WIDTH}}{'Result':<8}{'Time':>6}") + print(f" {'-' * COL_WIDTH}{'-' * 8}{'-' * 6}") + for name, status, elapsed, error in results: + color = "" + reset = "" + if sys.stdout.isatty(): + if status == "PASS": + color = "\033[32m" + elif status == "FAIL": + color = "\033[31m" + elif status == "SKIP": + color = "\033[33m" + reset = "\033[0m" + time_str = f"{elapsed:.1f}s" if elapsed > 0 else "-" + print(f" {name:<{COL_WIDTH}}{color}{status:<8}{reset}{time_str:>6}") + print() + total = passed + failed + skipped + print(f" Total: {total} | Passed: {passed} | Failed: {failed} | Skipped: {skipped}") + + if failed > 0: + print(f"\n FAILED TESTS:") + for name, status, elapsed, error in results: + if status == "FAIL": + print(f" - {name}") + if error: + print(f" {error}") + print() + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + +def main(): + print() + print("=" * 80) + print(" Databricks SQL Python Connector -- Azure SPOG E2E Test Suite") + print(f" Host: {HOST}") + print(f" Warehouse: {WAREHOUSE_PATH}") + print("=" * 80) + + # Show which credentials are available + print(f"\n Credentials detected:") + print(f" PAT .......................... {'YES' if PAT else 'NO'}") + print(f" Databricks M2M .............. {'YES' if (M2M_CLIENT_ID and M2M_CLIENT_SECRET) else 'NO'}") + print(f" U2M (browser) ............... {'YES' if U2M_ENABLED else 'NO'}") + print(f" Entra ID M2M ................ SKIP (not configured)") + + # Run all test groups + test_pat() + test_databricks_m2m() + test_u2m() + test_entra_id_m2m() + test_telemetry() + test_feature_flags() + + # Print summary table + print_summary() + + # Exit code + sys.exit(1 if failed > 0 else 0) + + +if __name__ == "__main__": + main() diff --git a/examples/spog_peco_test.py b/examples/spog_peco_test.py new file mode 100644 index 000000000..d333f5097 --- /dev/null +++ b/examples/spog_peco_test.py @@ -0,0 +1,440 @@ +""" +Full SPOG E2E test suite for Databricks SQL Python Connector. + +Tests all auth flows on BOTH SPOG and legacy Azure workspaces: + 1. PAT + 2. Databricks M2M (Databricks managed SP via workspace OIDC) + 3. Entra ID M2M (Azure AD app via login.microsoftonline.com, auth_type=azure-sp-m2m) + 4. U2M (browser OAuth - skipped by default) + 5. Telemetry endpoint + 6. Feature flags endpoint + +SPOG workspace: + Host: peco.azuredatabricks.net + Warehouse: /sql/1.0/warehouses/00adc7b6c00429b8?o=6436897454825492 + Workspace ID: 6436897454825492 + +Legacy workspace: + Host: adb-6436897454825492.12.azuredatabricks.net + Warehouse: /sql/1.0/warehouses/00adc7b6c00429b8 + +Env vars: + DATABRICKS_PECOTESTING_TOKEN -- PAT (works on both SPOG and legacy) + DATABRICKS_PECO_CLIENT_ID -- Databricks M2M client ID + DATABRICKS_PECO_CLIENT_SECRET -- Databricks M2M client secret + DATABRICKS_SPOG_ENTRA_TEST_CLIENT_ID -- Entra/Azure AD app client ID + DATABRICKS_SPOG_ENTRA_TEST_CLIENT_SECRET -- Entra/Azure AD app client secret + +Usage: + cd /path/to/databricks-sql-python + PYTHONPATH=src .venv/bin/python examples/spog_peco_test.py +""" + +import json +import os +import sys +import time +import urllib3 + +from databricks import sql + +# --------------------------------------------------------------------------- +# Configuration +# --------------------------------------------------------------------------- + +SPOG_HOST = "peco.azuredatabricks.net" +LEGACY_HOST = "adb-6436897454825492.12.azuredatabricks.net" +WORKSPACE_ID = "6436897454825492" + +SPOG_HTTP_PATH = f"/sql/1.0/warehouses/00adc7b6c00429b8?o={WORKSPACE_ID}" +LEGACY_HTTP_PATH = "/sql/1.0/warehouses/00adc7b6c00429b8" + +AZURE_TENANT_ID = "9f37a392-f0ae-4280-9796-f1864a10effc" + +FF_DRIVER = "PYTHON" +FF_VERSION = "4.2.5" + +# Credentials from env +# Try personal PAT first, fall back to regular +PAT = os.getenv("DATABRICKS_PECOTESTING_TOKEN_PERSONAL") or os.getenv("DATABRICKS_PECOTESTING_TOKEN") + +# Databricks managed SP for this workspace +M2M_CLIENT_ID = os.getenv("DATABRICKS_PECOTESTING_DATABRICKS_CLIENT_ID_MSR_SPN") +M2M_CLIENT_SECRET = os.getenv("DATABRICKS_PECOTESTING_DATABRICKS_CLIENT_SECRET_MSR_SPN") + +# Azure AD (Entra) app registered in this workspace's tenant +ENTRA_CLIENT_ID = os.getenv("DATABRICKS_PECOTESING_AAD_CLIENT_ID") +ENTRA_CLIENT_SECRET = os.getenv("DATABRICKS_PECOTESING_AAD_CLIENT_SECRET") + +U2M_ENABLED = os.getenv("DATABRICKS_SPOG_U2M_ENABLED") == "true" + +# Counters +passed = 0 +failed = 0 +skipped = 0 +results = [] + +# Disable urllib3 SSL warnings +urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) + + +# --------------------------------------------------------------------------- +# Workspace-level OIDC credentials provider (Databricks M2M) +# --------------------------------------------------------------------------- + +def _workspace_oidc_token(host, client_id, client_secret): + """ + Fetch token from workspace-level OIDC endpoint. + POST https://{host}/oidc/v1/token with client_credentials grant. + """ + url = f"https://{host}/oidc/v1/token" + http = urllib3.PoolManager() + resp = http.request( + "POST", + url, + headers={"Content-Type": "application/x-www-form-urlencoded"}, + body=( + f"grant_type=client_credentials" + f"&client_id={client_id}" + f"&client_secret={client_secret}" + f"&scope=all-apis" + ), + ) + if resp.status != 200: + raise RuntimeError( + f"OIDC token request to {url} failed: HTTP {resp.status}: " + f"{resp.data.decode('utf-8', errors='replace')[:300]}" + ) + data = json.loads(resp.data.decode("utf-8")) + return data["access_token"] + + +def make_databricks_m2m_provider(host, client_id, client_secret): + """Build a credentials_provider callable for Databricks M2M.""" + def credentials_provider(): + token = _workspace_oidc_token(host, client_id, client_secret) + def header_factory(): + return {"Authorization": f"Bearer {token}"} + return header_factory + return credentials_provider + + +# --------------------------------------------------------------------------- +# Test runner helpers +# --------------------------------------------------------------------------- + +COL_WIDTH = 60 + + +def _record(name, status, elapsed, error=None): + global passed, failed, skipped + if status == "PASS": + passed += 1 + elif status == "FAIL": + failed += 1 + elif status == "SKIP": + skipped += 1 + results.append((name, status, elapsed, error)) + + +def run_test(name, host, http_path, token=None, credentials_provider=None, + auth_type=None, extra_kwargs=None): + """Connect, run SELECT 1, verify result == 1, close.""" + print(f" {name:<{COL_WIDTH}}", end="", flush=True) + t0 = time.time() + try: + kwargs = { + "server_hostname": host, + "http_path": http_path, + } + if token: + kwargs["access_token"] = token + if credentials_provider: + kwargs["credentials_provider"] = credentials_provider + if auth_type: + kwargs["auth_type"] = auth_type + if extra_kwargs: + kwargs.update(extra_kwargs) + + conn = sql.connect(**kwargs) + cursor = conn.cursor() + cursor.execute("SELECT 1 AS v") + row = cursor.fetchone() + cursor.close() + conn.close() + + elapsed = time.time() - t0 + + if row is None: + print(f"FAIL ({elapsed:.1f}s)") + print(f" -> fetchone() returned None") + _record(name, "FAIL", elapsed, "fetchone() returned None") + elif row[0] != 1: + print(f"FAIL ({elapsed:.1f}s)") + print(f" -> Expected 1, got {row[0]!r}") + _record(name, "FAIL", elapsed, f"Expected 1, got {row[0]!r}") + else: + print(f"PASS ({elapsed:.1f}s)") + _record(name, "PASS", elapsed) + + except Exception as e: + elapsed = time.time() - t0 + err_msg = f"{type(e).__name__}: {str(e)[:300]}" + print(f"FAIL ({elapsed:.1f}s)") + print(f" -> {err_msg}") + _record(name, "FAIL", elapsed, err_msg) + + +def skip_test(name, reason): + print(f" {name:<{COL_WIDTH}}SKIP ({reason})") + _record(name, "SKIP", 0.0, reason) + + +def section(title): + print(f"\n{'=' * 80}") + print(f" {title}") + print(f"{'=' * 80}") + + +# --------------------------------------------------------------------------- +# HTTP endpoint tests +# --------------------------------------------------------------------------- + +def test_telemetry(host, label_prefix=""): + """Direct HTTP POST to /telemetry-ext, verify HTTP 200.""" + name = f"{label_prefix}Telemetry | POST /telemetry-ext" + print(f" {name:<{COL_WIDTH}}", end="", flush=True) + + if not PAT: + print("SKIP (no PAT)") + _record(name, "SKIP", 0.0, "no PAT") + return + + t0 = time.time() + try: + http = urllib3.PoolManager() + resp = http.request( + "POST", + f"https://{host}/telemetry-ext", + headers={ + "Authorization": f"Bearer {PAT}", + "Content-Type": "application/json", + }, + body=json.dumps({"items": [], "protoLogs": []}).encode("utf-8"), + timeout=30.0, + ) + elapsed = time.time() - t0 + if resp.status == 200: + print(f"PASS ({elapsed:.1f}s)") + _record(name, "PASS", elapsed) + else: + body = resp.data.decode("utf-8", errors="replace")[:200] + print(f"FAIL ({elapsed:.1f}s)") + print(f" -> HTTP {resp.status}: {body}") + _record(name, "FAIL", elapsed, f"HTTP {resp.status}: {body}") + except Exception as e: + elapsed = time.time() - t0 + err_msg = f"{type(e).__name__}: {str(e)[:200]}" + print(f"FAIL ({elapsed:.1f}s)") + print(f" -> {err_msg}") + _record(name, "FAIL", elapsed, err_msg) + + +def test_feature_flags(host, label_prefix=""): + """Direct HTTP GET to feature flags endpoint.""" + path = f"/api/2.0/connector-service/feature-flags/{FF_DRIVER}/{FF_VERSION}" + name = f"{label_prefix}Feature Flags | GET {path}" + print(f" {name:<{COL_WIDTH}}", end="", flush=True) + + if not PAT: + print("SKIP (no PAT)") + _record(name, "SKIP", 0.0, "no PAT") + return + + t0 = time.time() + try: + http = urllib3.PoolManager() + resp = http.request( + "GET", + f"https://{host}{path}", + headers={"Authorization": f"Bearer {PAT}"}, + timeout=30.0, + ) + elapsed = time.time() - t0 + if resp.status == 200: + print(f"PASS ({elapsed:.1f}s)") + _record(name, "PASS", elapsed) + else: + body = resp.data.decode("utf-8", errors="replace")[:200] + print(f"FAIL ({elapsed:.1f}s)") + print(f" -> HTTP {resp.status}: {body}") + _record(name, "FAIL", elapsed, f"HTTP {resp.status}: {body}") + except Exception as e: + elapsed = time.time() - t0 + err_msg = f"{type(e).__name__}: {str(e)[:200]}" + print(f"FAIL ({elapsed:.1f}s)") + print(f" -> {err_msg}") + _record(name, "FAIL", elapsed, err_msg) + + +# --------------------------------------------------------------------------- +# Test groups +# --------------------------------------------------------------------------- + +def test_pat(host, http_path, label_prefix=""): + """PAT auth.""" + if not PAT: + skip_test(f"{label_prefix}PAT", "DATABRICKS_PECOTESTING_TOKEN not set") + return + run_test(f"{label_prefix}PAT", host, http_path, token=PAT) + + +def test_databricks_m2m(host, http_path, label_prefix=""): + """Databricks M2M (workspace OIDC).""" + if not (M2M_CLIENT_ID and M2M_CLIENT_SECRET): + skip_test(f"{label_prefix}Databricks M2M", "M2M creds not set") + return + + # For SPOG, the OIDC endpoint is on the legacy host (SPOG OIDC returns account-level tokens) + # We need to get the token from the legacy host and use it on SPOG + # Actually, let's try the M2M directly on the target host to see what happens + provider = make_databricks_m2m_provider(host, M2M_CLIENT_ID, M2M_CLIENT_SECRET) + run_test(f"{label_prefix}Databricks M2M (workspace OIDC)", host, http_path, + credentials_provider=provider) + + +def test_entra_m2m(host, http_path, label_prefix=""): + """Entra M2M (Azure AD SP via auth_type=azure-sp-m2m).""" + if not (ENTRA_CLIENT_ID and ENTRA_CLIENT_SECRET): + skip_test(f"{label_prefix}Entra M2M (azure-sp-m2m)", "Entra creds not set") + return + + # Test with explicit tenant ID + run_test( + f"{label_prefix}Entra M2M (explicit tenant)", + host, http_path, + auth_type="azure-sp-m2m", + extra_kwargs={ + "azure_client_id": ENTRA_CLIENT_ID, + "azure_client_secret": ENTRA_CLIENT_SECRET, + "azure_tenant_id": AZURE_TENANT_ID, + }, + ) + + # Test with auto-discovered tenant (via /aad/auth) + run_test( + f"{label_prefix}Entra M2M (auto-discover tenant)", + host, http_path, + auth_type="azure-sp-m2m", + extra_kwargs={ + "azure_client_id": ENTRA_CLIENT_ID, + "azure_client_secret": ENTRA_CLIENT_SECRET, + # azure_tenant_id omitted → connector calls /aad/auth to discover + }, + ) + + +def test_u2m(host, http_path, label_prefix=""): + """U2M (browser OAuth).""" + if not U2M_ENABLED: + skip_test(f"{label_prefix}U2M (browser)", "DATABRICKS_SPOG_U2M_ENABLED != true") + return + run_test(f"{label_prefix}U2M (browser)", host, http_path, + auth_type="databricks-oauth") + + +# --------------------------------------------------------------------------- +# Summary +# --------------------------------------------------------------------------- + +def print_summary(): + print(f"\n{'=' * 80}") + print(" SUMMARY") + print(f"{'=' * 80}") + print(f" {'Test':<{COL_WIDTH}}{'Result':<8}{'Time':>6}") + print(f" {'-' * COL_WIDTH}{'-' * 8}{'-' * 6}") + for name, status, elapsed, error in results: + color = "" + reset = "" + if sys.stdout.isatty(): + if status == "PASS": + color = "\033[32m" + elif status == "FAIL": + color = "\033[31m" + elif status == "SKIP": + color = "\033[33m" + reset = "\033[0m" + time_str = f"{elapsed:.1f}s" if elapsed > 0 else "-" + print(f" {name:<{COL_WIDTH}}{color}{status:<8}{reset}{time_str:>6}") + print() + total = passed + failed + skipped + print(f" Total: {total} | Passed: {passed} | Failed: {failed} | Skipped: {skipped}") + + if failed > 0: + print(f"\n FAILED TESTS:") + for name, status, elapsed, error in results: + if status == "FAIL": + print(f" - {name}") + if error: + print(f" {error}") + print() + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + +def main(): + print() + print("=" * 80) + print(" Databricks SQL Python Connector — SPOG Peco E2E Test Suite") + print("=" * 80) + + # Show detected credentials + print(f"\n Credentials detected:") + print(f" PAT .......................... {'YES' if PAT else 'NO'}") + print(f" Databricks M2M .............. {'YES' if (M2M_CLIENT_ID and M2M_CLIENT_SECRET) else 'NO'}") + print(f" Entra ID M2M ................ {'YES' if (ENTRA_CLIENT_ID and ENTRA_CLIENT_SECRET) else 'NO'}") + print(f" U2M (browser) ............... {'YES' if U2M_ENABLED else 'NO'}") + + # ===================================================================== + # LEGACY workspace tests + # ===================================================================== + section(f"LEGACY — {LEGACY_HOST}") + print(f" HTTP path: {LEGACY_HTTP_PATH}\n") + + test_pat(LEGACY_HOST, LEGACY_HTTP_PATH, "Legacy | ") + test_databricks_m2m(LEGACY_HOST, LEGACY_HTTP_PATH, "Legacy | ") + test_entra_m2m(LEGACY_HOST, LEGACY_HTTP_PATH, "Legacy | ") + test_u2m(LEGACY_HOST, LEGACY_HTTP_PATH, "Legacy | ") + + # ===================================================================== + # SPOG workspace tests + # ===================================================================== + section(f"SPOG — {SPOG_HOST}") + print(f" HTTP path: {SPOG_HTTP_PATH}\n") + + test_pat(SPOG_HOST, SPOG_HTTP_PATH, "SPOG | ") + test_databricks_m2m(SPOG_HOST, SPOG_HTTP_PATH, "SPOG | ") + test_entra_m2m(SPOG_HOST, SPOG_HTTP_PATH, "SPOG | ") + test_u2m(SPOG_HOST, SPOG_HTTP_PATH, "SPOG | ") + + # ===================================================================== + # Endpoint tests (SPOG only — these are the interesting ones) + # ===================================================================== + section("Endpoints (SPOG)") + test_telemetry(SPOG_HOST, "SPOG | ") + test_feature_flags(SPOG_HOST, "SPOG | ") + + section("Endpoints (Legacy)") + test_telemetry(LEGACY_HOST, "Legacy | ") + test_feature_flags(LEGACY_HOST, "Legacy | ") + + # Print summary + print_summary() + + sys.exit(1 if failed > 0 else 0) + + +if __name__ == "__main__": + main() diff --git a/examples/spog_sdk_m2m_only.py b/examples/spog_sdk_m2m_only.py new file mode 100644 index 000000000..1204e5e19 --- /dev/null +++ b/examples/spog_sdk_m2m_only.py @@ -0,0 +1,58 @@ +"""Minimal SDK M2M test on staging SPOG — uses latest databricks-sdk (0.102.0).""" +import os +import sys +import time +from databricks import sql +from databricks.sdk.core import oauth_service_principal, Config +import databricks.sdk as _sdk + +print(f"databricks-sdk version: {_sdk.version.__version__}") + +SPOG_HOST = "dogfood-spog.staging.azuredatabricks.net" +LEGACY_HOST = "adb-7064161269814046.2.staging.azuredatabricks.net" +WAREHOUSE = "/sql/1.0/warehouses/e256699345d1ac74" +SPOG_PATH = WAREHOUSE + "?o=7064161269814046" + +client_id = os.getenv("DATABRICKS_DOGFOOD_AZURE_CLIENT_ID") +client_secret = os.getenv("DATABRICKS_DOGFOOD_AZURE_CLIENT_SECRET") +if not (client_id and client_secret): + print("Missing DATABRICKS_DOGFOOD_AZURE_CLIENT_ID/SECRET") + sys.exit(1) + + +def make_provider(hostname): + def provider(): + cfg = Config( + host=f"https://{hostname}", + client_id=client_id, + client_secret=client_secret, + ) + return oauth_service_principal(cfg) + + return provider + + +def run(label, host, path): + t0 = time.time() + try: + conn = sql.connect( + server_hostname=host, + http_path=path, + credentials_provider=make_provider(host), + ) + cur = conn.cursor() + cur.execute("SELECT 1") + row = cur.fetchone() + cur.close() + conn.close() + dt = time.time() - t0 + print(f"{label}: PASS ({dt:.1f}s) result={row[0]}") + except Exception as e: + dt = time.time() - t0 + msg = str(e)[:400] + print(f"{label}: FAIL ({dt:.1f}s)") + print(f" -> {type(e).__name__}: {msg}") + + +run("SDK M2M | Staging SPOG", SPOG_HOST, SPOG_PATH) +run("SDK M2M | Staging Legacy", LEGACY_HOST, WAREHOUSE) diff --git a/examples/spog_staging_test.py b/examples/spog_staging_test.py new file mode 100644 index 000000000..3a4d4567c --- /dev/null +++ b/examples/spog_staging_test.py @@ -0,0 +1,281 @@ +""" +Python SQL Connector — SPOG Staging E2E Test + +Tests all auth flows on the staging SPOG workspace. + +Auth flows tested: + 1. PAT (access_token) + 2. SDK M2M (credentials_provider via databricks-sdk oauth_service_principal) + → sends to {host}/oidc/v1/token (Databricks OIDC) + 3. Azure SP M2M (auth_type="azure-sp-m2m" with explicit tenant) + → sends to login.microsoftonline.com/{tenant}/oauth2/token (Azure AD) + 4. Azure SP M2M with auto-discovered tenant (no azure_tenant_id) + → calls /aad/auth to discover tenant, then sends to Azure AD + +Workspaces: + SPOG: dogfood-spog.staging.azuredatabricks.net + Legacy: adb-7064161269814046.2.staging.azuredatabricks.net + Warehouse: /sql/1.0/warehouses/e256699345d1ac74 + SPOG path: /sql/1.0/warehouses/e256699345d1ac74?o=7064161269814046 + Tenant: e3fe3f22-4b98-4c04-82cc-d8817d1b17da + +Env vars (from JDBC test): + DATABRICKS_DOGFOOD_WESTUS_STAGING_TOKEN — PAT + DATABRICKS_DOGFOOD_AZURE_CLIENT_ID/SECRET — Databricks managed SP (dose secret) + DATABRICKS_SPOG_ENTRA_TEST_CLIENT_ID/SECRET — Azure AD app (Azure Portal secret) + DATABRICKS_AAD_CLIENT_ID/SECRET — Azure AD app (Azure Portal secret) + +Usage: + source ~/.zshrc + cd databricks-sql-python + PYTHONPATH=src .venv/bin/python examples/spog_staging_test.py +""" + +import os +import sys +import time +import traceback + +# Must be first — so credentials_provider import works +from databricks import sql +from databricks.sdk.core import oauth_service_principal, Config + +# --------------------------------------------------------------------------- +# Configuration — matches OSS JDBC OssJdbcSpogFullTest.java +# --------------------------------------------------------------------------- + +SPOG_HOST = "dogfood-spog.staging.azuredatabricks.net" +LEGACY_HOST = "adb-7064161269814046.2.staging.azuredatabricks.net" +WAREHOUSE = "/sql/1.0/warehouses/e256699345d1ac74" +SPOG_PATH = WAREHOUSE + "?o=7064161269814046" +TENANT = "e3fe3f22-4b98-4c04-82cc-d8817d1b17da" + +# --------------------------------------------------------------------------- +# Credentials from env +# --------------------------------------------------------------------------- + +PAT = os.getenv("DATABRICKS_DOGFOOD_WESTUS_STAGING_TOKEN") + +# SDK M2M: Databricks managed SP with Databricks secret (dose...) +# Same as JDBC test: DATABRICKS_DOGFOOD_AZURE_CLIENT_ID/SECRET +SDK_M2M_ID = os.getenv("DATABRICKS_DOGFOOD_AZURE_CLIENT_ID") +SDK_M2M_SECRET = os.getenv("DATABRICKS_DOGFOOD_AZURE_CLIENT_SECRET") + +# Azure AD app with Azure Portal secret (eAA8...) +# Same as JDBC test: DATABRICKS_SPOG_ENTRA_TEST_CLIENT_ID/SECRET +ENTRA_ID = os.getenv("DATABRICKS_SPOG_ENTRA_TEST_CLIENT_ID") +ENTRA_SECRET = os.getenv("DATABRICKS_SPOG_ENTRA_TEST_CLIENT_SECRET") + +# Another Azure AD app with Azure Portal secret (US28...) +AAD_ID = os.getenv("DATABRICKS_AAD_CLIENT_ID") +AAD_SECRET = os.getenv("DATABRICKS_AAD_CLIENT_SECRET") + +# --------------------------------------------------------------------------- +# Test infrastructure +# --------------------------------------------------------------------------- + +results = [] +COL = 62 + + +def record(name, status, elapsed=0, error=None): + results.append((name, status, elapsed, error)) + tag = {"PASS": "\033[32mPASS\033[0m", "FAIL": "\033[31mFAIL\033[0m", "SKIP": "\033[33mSKIP\033[0m"}.get(status, status) + time_str = f"({elapsed:.1f}s)" if elapsed > 0 else "" + print(f" {name:<{COL}}{tag} {time_str}") + if error: + # Truncate long errors + err = str(error)[:300] + print(f" -> {err}") + + +def run_query_test(name, **connect_kwargs): + """Connect, SELECT 1, verify, close.""" + t0 = time.time() + try: + conn = sql.connect(**connect_kwargs) + cursor = conn.cursor() + cursor.execute("SELECT 1 AS v") + row = cursor.fetchone() + cursor.close() + conn.close() + elapsed = time.time() - t0 + if row and row[0] == 1: + record(name, "PASS", elapsed) + else: + record(name, "FAIL", elapsed, f"Expected 1, got {row}") + except Exception as e: + elapsed = time.time() - t0 + record(name, "FAIL", elapsed, f"{type(e).__name__}: {e}") + + +def skip(name, reason): + record(name, "SKIP", error=reason) + + +def section(title): + print(f"\n{'=' * 80}") + print(f" {title}") + print(f"{'=' * 80}") + + +# --------------------------------------------------------------------------- +# SDK M2M credentials_provider builder +# Exactly like examples/m2m_oauth.py +# --------------------------------------------------------------------------- + +def make_sdk_m2m_provider(hostname, client_id, client_secret): + """Build credentials_provider using databricks-sdk oauth_service_principal.""" + def credential_provider(): + config = Config( + host=f"https://{hostname}", + client_id=client_id, + client_secret=client_secret, + ) + return oauth_service_principal(config) + return credential_provider + + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- + +def test_pat(): + section("1. PAT Authentication") + if not PAT: + skip("PAT | SPOG", "DATABRICKS_DOGFOOD_WESTUS_STAGING_TOKEN not set") + skip("PAT | Legacy", "") + return + run_query_test("PAT | SPOG", + server_hostname=SPOG_HOST, http_path=SPOG_PATH, access_token=PAT) + run_query_test("PAT | Legacy", + server_hostname=LEGACY_HOST, http_path=WAREHOUSE, access_token=PAT) + + +def test_sdk_m2m(): + section("2. SDK M2M (Databricks OIDC via oauth_service_principal)") + if not (SDK_M2M_ID and SDK_M2M_SECRET): + skip("SDK M2M | SPOG", "DATABRICKS_DOGFOOD_AZURE_CLIENT_ID/SECRET not set") + skip("SDK M2M | Legacy", "") + return + + provider = make_sdk_m2m_provider(SPOG_HOST, SDK_M2M_ID, SDK_M2M_SECRET) + run_query_test("SDK M2M (dc8dd813) | SPOG", + server_hostname=SPOG_HOST, http_path=SPOG_PATH, + credentials_provider=provider) + + provider_legacy = make_sdk_m2m_provider(LEGACY_HOST, SDK_M2M_ID, SDK_M2M_SECRET) + run_query_test("SDK M2M (dc8dd813) | Legacy", + server_hostname=LEGACY_HOST, http_path=WAREHOUSE, + credentials_provider=provider_legacy) + + +def test_azure_sp_m2m(): + section("3. Azure SP M2M (login.microsoftonline.com via auth_type=azure-sp-m2m)") + + # 3a. SPOG_ENTRA_TEST — Azure AD app (d7f11108) with Azure secret (eAA8...) + if ENTRA_ID and ENTRA_SECRET: + run_query_test( + "Azure SP M2M (d7f11108, explicit tenant) | SPOG", + server_hostname=SPOG_HOST, http_path=SPOG_PATH, + auth_type="azure-sp-m2m", + azure_client_id=ENTRA_ID, + azure_client_secret=ENTRA_SECRET, + azure_tenant_id=TENANT, + ) + run_query_test( + "Azure SP M2M (d7f11108, explicit tenant) | Legacy", + server_hostname=LEGACY_HOST, http_path=WAREHOUSE, + auth_type="azure-sp-m2m", + azure_client_id=ENTRA_ID, + azure_client_secret=ENTRA_SECRET, + azure_tenant_id=TENANT, + ) + else: + skip("Azure SP M2M (d7f11108) | SPOG", "DATABRICKS_SPOG_ENTRA_TEST_CLIENT_ID/SECRET not set") + skip("Azure SP M2M (d7f11108) | Legacy", "") + + # 3b. AAD_CLIENT — Azure AD app (d154b9ed) with Azure secret (US28...) + if AAD_ID and AAD_SECRET: + run_query_test( + "Azure SP M2M (d154b9ed, explicit tenant) | SPOG", + server_hostname=SPOG_HOST, http_path=SPOG_PATH, + auth_type="azure-sp-m2m", + azure_client_id=AAD_ID, + azure_client_secret=AAD_SECRET, + azure_tenant_id=TENANT, + ) + run_query_test( + "Azure SP M2M (d154b9ed, explicit tenant) | Legacy", + server_hostname=LEGACY_HOST, http_path=WAREHOUSE, + auth_type="azure-sp-m2m", + azure_client_id=AAD_ID, + azure_client_secret=AAD_SECRET, + azure_tenant_id=TENANT, + ) + else: + skip("Azure SP M2M (d154b9ed) | SPOG", "DATABRICKS_AAD_CLIENT_ID/SECRET not set") + skip("Azure SP M2M (d154b9ed) | Legacy", "") + + +def test_azure_sp_m2m_auto_tenant(): + section("4. Azure SP M2M — auto-discover tenant via /aad/auth") + # On staging, /aad/auth returns 403, so this is expected to fail + if ENTRA_ID and ENTRA_SECRET: + run_query_test( + "Azure SP M2M (d7f11108, auto-tenant) | Legacy", + server_hostname=LEGACY_HOST, http_path=WAREHOUSE, + auth_type="azure-sp-m2m", + azure_client_id=ENTRA_ID, + azure_client_secret=ENTRA_SECRET, + # azure_tenant_id omitted — connector calls /aad/auth + ) + else: + skip("Azure SP M2M (auto-tenant) | Legacy", "no Entra creds") + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + +def main(): + print() + print("=" * 80) + print(" Python SQL Connector — SPOG Staging E2E Test") + print(f" SPOG: {SPOG_HOST}") + print(f" Legacy: {LEGACY_HOST}") + print(f" Warehouse: {WAREHOUSE}") + print("=" * 80) + print() + print(" Credentials:") + print(f" PAT ........................ {'YES' if PAT else 'NO'}") + print(f" SDK M2M (dc8dd813) ......... {'YES' if SDK_M2M_ID else 'NO'} (dose secret → Databricks OIDC)") + print(f" Entra (d7f11108) ........... {'YES' if ENTRA_ID else 'NO'} (Azure secret → login.microsoftonline.com)") + print(f" AAD (d154b9ed) ............. {'YES' if AAD_ID else 'NO'} (Azure secret → login.microsoftonline.com)") + + test_pat() + test_sdk_m2m() + test_azure_sp_m2m() + test_azure_sp_m2m_auto_tenant() + + # Summary + print(f"\n{'=' * 80}") + print(" SUMMARY") + print(f"{'=' * 80}") + p = sum(1 for _, s, _, _ in results if s == "PASS") + f = sum(1 for _, s, _, _ in results if s == "FAIL") + s = sum(1 for _, s, _, _ in results if s == "SKIP") + print(f" Total: {len(results)} | PASS: {p} | FAIL: {f} | SKIP: {s}") + if f: + print(f"\n FAILURES:") + for name, status, _, err in results: + if status == "FAIL": + print(f" - {name}") + if err: + print(f" {str(err)[:200]}") + print() + sys.exit(1 if f > 0 else 0) + + +if __name__ == "__main__": + main() diff --git a/examples/spog_test.py b/examples/spog_test.py new file mode 100644 index 000000000..14d494265 --- /dev/null +++ b/examples/spog_test.py @@ -0,0 +1,284 @@ +""" +SPOG (Single Panel of Glass) tests for Databricks SQL Python Connector. + +Usage: + cd /path/to/databricks-sql-python + PYTHONPATH=src DATABRICKS_SPOG_TOKEN= .venv/bin/python examples/spog_test.py +""" + +import json +import os +import sys +import time + +from databricks import sql +from databricks.sql.common.http import HttpMethod +from databricks.sql.common.url_utils import normalize_host_with_protocol + +SPOG_HOST = "e2-spog.staging.cloud.databricks.com" +LEGACY_HOST = "e2-dogfood.staging.cloud.databricks.com" +WAREHOUSE_HTTP_PATH = "/sql/1.0/warehouses/dd43ee29fedd958d" +WORKSPACE_ID = "6051921418418893" +WAREHOUSE_HTTP_PATH_SPOG = f"{WAREHOUSE_HTTP_PATH}?o={WORKSPACE_ID}" +GP_CLUSTER_PATH = os.getenv("DATABRICKS_SPOG_GP_CLUSTER_PATH") +M2M_CLIENT_ID = os.getenv("DATABRICKS_SPOG_M2M_CLIENT_ID") +M2M_CLIENT_SECRET = os.getenv("DATABRICKS_SPOG_M2M_CLIENT_SECRET") +PAT = os.getenv("DATABRICKS_SPOG_TOKEN") + +passed = 0 +failed = 0 +skipped = 0 + + +def test(name, host, http_path, token=None, credentials_provider=None, + use_sea=False, expect_fail=False): + global passed, failed + print(f"{name:<60}", end="", flush=True) + t = time.time() + try: + kwargs = { + "server_hostname": host, + "http_path": http_path, + } + if token: + kwargs["access_token"] = token + if credentials_provider: + kwargs["credentials_provider"] = credentials_provider + if use_sea: + kwargs["use_sea"] = True + + conn = sql.connect(**kwargs) + cursor = conn.cursor() + cursor.execute("SELECT 1 AS v") + row = cursor.fetchone() + cursor.close() + conn.close() + + if expect_fail: + print(f"FAIL (expected failure but got result: {row})") + failed += 1 + elif row[0] == 1: + print(f"PASS ({time.time() - t:.1f}s)") + passed += 1 + else: + print(f"FAIL (unexpected: {row})") + failed += 1 + except Exception as e: + if expect_fail: + print(f"PASS (expected failure, {time.time() - t:.1f}s)") + passed += 1 + else: + print(f"FAIL ({time.time() - t:.1f}s)") + print(f" {type(e).__name__}: {str(e)[:200]}") + failed += 1 + + +def test_custom(name, test_fn): + global passed, failed + print(f"{name:<60}", end="", flush=True) + t = time.time() + try: + test_fn() + print(f"PASS ({time.time() - t:.1f}s)") + passed += 1 + except Exception as e: + print(f"FAIL ({time.time() - t:.1f}s)") + print(f" {type(e).__name__}: {str(e)[:200]}") + failed += 1 + + +def skip(name, reason): + global skipped + print(f"{name:<60}SKIPPED ({reason})") + skipped += 1 + + +def verify_telemetry(conn, label=""): + """Directly hit the telemetry endpoint using the connection's auth and assert 200.""" + session = conn.session + host_url = normalize_host_with_protocol(session.host) + url = f"{host_url}/telemetry-ext" + + headers = {"Accept": "application/json", "Content-Type": "application/json"} + session.auth_provider.add_headers(headers) + headers.update(session.get_spog_headers()) + + # Send a minimal valid telemetry payload + payload = json.dumps({"uploadTime": int(time.time() * 1000), "items": [], "protoLogs": []}) + response = session.http_client.request(HttpMethod.POST, url, headers=headers, body=payload, timeout=30) + assert response.status == 200, f"Telemetry {label} returned {response.status}, expected 200" + + +def verify_feature_flags(conn, label=""): + """Directly hit the feature flags endpoint using the connection's auth and assert 200.""" + from databricks.sql import __version__ + session = conn.session + host_url = normalize_host_with_protocol(session.host) + url = f"{host_url}/api/2.0/connector-service/feature-flags/PYTHON/{__version__}" + + headers = {} + session.auth_provider.add_headers(headers) + headers["User-Agent"] = session.useragent_header + headers.update(session.get_spog_headers()) + + response = session.http_client.request(HttpMethod.GET, url, headers=headers, timeout=30) + assert response.status == 200, f"Feature flags {label} returned {response.status}, expected 200" + + +def test_with_endpoints(name, host, http_path, token=None, credentials_provider=None, + use_sea=False): + """Test data path + explicitly assert telemetry and feature flags return 200.""" + global passed, failed + print(f"{name:<60}", end="", flush=True) + t = time.time() + try: + kwargs = {"server_hostname": host, "http_path": http_path} + if token: + kwargs["access_token"] = token + if credentials_provider: + kwargs["credentials_provider"] = credentials_provider + if use_sea: + kwargs["use_sea"] = True + + conn = sql.connect(**kwargs) + + # 1. Verify data path + cursor = conn.cursor() + cursor.execute("SELECT 1 AS v") + row = cursor.fetchone() + assert row[0] == 1, f"Expected 1, got {row[0]}" + cursor.close() + + # 2. Verify telemetry endpoint returns 200 + verify_telemetry(conn, label=name) + + # 3. Verify feature flags endpoint returns 200 + verify_feature_flags(conn, label=name) + + conn.close() + print(f"PASS ({time.time() - t:.1f}s)") + passed += 1 + except Exception as e: + print(f"FAIL ({time.time() - t:.1f}s)") + print(f" {type(e).__name__}: {str(e)[:300]}") + failed += 1 + + +def m2m_cred_provider(host): + from databricks.sdk.core import oauth_service_principal, Config + def provider(): + config = Config( + host=f"https://{host}", + client_id=M2M_CLIENT_ID, + client_secret=M2M_CLIENT_SECRET, + ) + return oauth_service_principal(config) + return provider + + +def main(): + if not PAT: + print("ERROR: Set DATABRICKS_SPOG_TOKEN") + sys.exit(1) + + print("=== Python SQL Connector SPOG Tests ===\n") + + # --- DBSQL + PAT + Thrift --- + test("Legacy + Thrift (PAT)", LEGACY_HOST, WAREHOUSE_HTTP_PATH, token=PAT) + test("SPOG + ?o= + Thrift (PAT)", SPOG_HOST, WAREHOUSE_HTTP_PATH_SPOG, token=PAT) + test("SPOG no routing + Thrift (expect fail)", SPOG_HOST, WAREHOUSE_HTTP_PATH, + token=PAT, expect_fail=True) + test("Legacy + ?o= + Thrift (PAT)", LEGACY_HOST, WAREHOUSE_HTTP_PATH_SPOG, token=PAT) + + # --- DBSQL + PAT + SEA --- + test("Legacy + SEA (PAT)", LEGACY_HOST, WAREHOUSE_HTTP_PATH, token=PAT, use_sea=True) + test("SPOG + ?o= + SEA (PAT)", SPOG_HOST, WAREHOUSE_HTTP_PATH_SPOG, token=PAT, use_sea=True) + test("SPOG no routing + SEA (expect fail)", SPOG_HOST, WAREHOUSE_HTTP_PATH, + token=PAT, use_sea=True, expect_fail=True) + test("Legacy + ?o= + SEA (PAT)", LEGACY_HOST, WAREHOUSE_HTTP_PATH_SPOG, + token=PAT, use_sea=True) + + # --- GP Cluster --- + if GP_CLUSTER_PATH: + test("SPOG GP Cluster (PAT, Thrift)", SPOG_HOST, GP_CLUSTER_PATH, token=PAT) + test("Legacy GP Cluster (PAT, Thrift)", LEGACY_HOST, GP_CLUSTER_PATH, token=PAT) + else: + skip("SPOG GP Cluster", "DATABRICKS_SPOG_GP_CLUSTER_PATH") + skip("Legacy GP Cluster", "DATABRICKS_SPOG_GP_CLUSTER_PATH") + + # --- OAuth M2M --- + try: + from databricks.sdk.core import oauth_service_principal, Config + has_sdk = True + except ImportError: + has_sdk = False + + if M2M_CLIENT_ID and M2M_CLIENT_SECRET and has_sdk: + test("SPOG + ?o= + M2M (Thrift)", SPOG_HOST, WAREHOUSE_HTTP_PATH_SPOG, + credentials_provider=m2m_cred_provider(SPOG_HOST)) + test("SPOG + ?o= + M2M (SEA)", SPOG_HOST, WAREHOUSE_HTTP_PATH_SPOG, + credentials_provider=m2m_cred_provider(SPOG_HOST), use_sea=True) + test("Legacy + M2M (Thrift)", LEGACY_HOST, WAREHOUSE_HTTP_PATH, + credentials_provider=m2m_cred_provider(LEGACY_HOST)) + elif not has_sdk: + skip("SPOG M2M (Thrift)", "databricks-sdk not installed") + skip("SPOG M2M (SEA)", "databricks-sdk not installed") + skip("Legacy M2M", "databricks-sdk not installed") + else: + skip("SPOG M2M (Thrift)", "M2M creds") + skip("SPOG M2M (SEA)", "M2M creds") + skip("Legacy M2M", "M2M creds") + + # --- Telemetry + Feature Flags (PAT, explicit endpoint verification) --- + test_with_endpoints("SPOG + Telemetry + FF (PAT, Thrift)", SPOG_HOST, + WAREHOUSE_HTTP_PATH_SPOG, token=PAT) + test_with_endpoints("SPOG + Telemetry + FF (PAT, SEA)", SPOG_HOST, + WAREHOUSE_HTTP_PATH_SPOG, token=PAT, use_sea=True) + + # --- OAuth M2M with Telemetry + Feature Flags --- + if M2M_CLIENT_ID and M2M_CLIENT_SECRET and has_sdk: + test_with_endpoints("SPOG + Telemetry + FF (M2M, Thrift)", SPOG_HOST, + WAREHOUSE_HTTP_PATH_SPOG, + credentials_provider=m2m_cred_provider(SPOG_HOST)) + test_with_endpoints("SPOG + Telemetry + FF (M2M, SEA)", SPOG_HOST, + WAREHOUSE_HTTP_PATH_SPOG, + credentials_provider=m2m_cred_provider(SPOG_HOST), use_sea=True) + else: + skip("SPOG + Telemetry + FF (M2M, Thrift)", "M2M creds or SDK") + skip("SPOG + Telemetry + FF (M2M, SEA)", "M2M creds or SDK") + + # --- OAuth U2M with Telemetry + Feature Flags (browser login) --- + if os.getenv("DATABRICKS_SPOG_U2M_ENABLED") == "true": + test_with_endpoints("SPOG + Telemetry + FF (U2M, Thrift)", SPOG_HOST, + WAREHOUSE_HTTP_PATH_SPOG) + test_with_endpoints("SPOG + Telemetry + FF (U2M, SEA)", SPOG_HOST, + WAREHOUSE_HTTP_PATH_SPOG, use_sea=True) + test("Legacy + U2M (Thrift)", LEGACY_HOST, WAREHOUSE_HTTP_PATH) + else: + skip("SPOG + Telemetry + FF (U2M, Thrift)", "DATABRICKS_SPOG_U2M_ENABLED") + skip("SPOG + Telemetry + FF (U2M, SEA)", "DATABRICKS_SPOG_U2M_ENABLED") + skip("Legacy U2M (Thrift)", "DATABRICKS_SPOG_U2M_ENABLED") + + # --- Volume LIST --- + def volume_list(host, path, use_sea=False): + def _test(): + kwargs = {"server_hostname": host, "http_path": path, "access_token": PAT} + if use_sea: + kwargs["use_sea"] = True + conn = sql.connect(**kwargs) + cursor = conn.cursor() + cursor.execute("LIST '/Volumes/main/default/_'") + rows = cursor.fetchall() + assert len(rows) > 0, f"Expected rows, got {len(rows)}" + cursor.close() + conn.close() + return _test + + test_custom("SPOG Volume LIST (Thrift)", volume_list(SPOG_HOST, WAREHOUSE_HTTP_PATH_SPOG)) + test_custom("Legacy Volume LIST (Thrift)", volume_list(LEGACY_HOST, WAREHOUSE_HTTP_PATH)) + + print(f"\n=== Results: {passed} passed, {failed} failed, {skipped} skipped ===") + + +if __name__ == "__main__": + main() diff --git a/examples/spog_u2m_one.py b/examples/spog_u2m_one.py new file mode 100644 index 000000000..b66dbdd86 --- /dev/null +++ b/examples/spog_u2m_one.py @@ -0,0 +1,36 @@ +"""Minimal pysql U2M (databricks-oauth) test — one host per invocation. + +Usage: .venv/bin/python examples/spog_u2m_one.py +""" +import os, sys, time +from databricks import sql + +TARGET = sys.argv[1] if len(sys.argv) > 1 else "spog" +if TARGET == "spog": + HOST = "dogfood-spog.staging.azuredatabricks.net" + PATH = "/sql/1.0/warehouses/e256699345d1ac74?o=7064161269814046" +elif TARGET == "prod-legacy": + HOST = "adb-6436897454825492.12.azuredatabricks.net" + PATH = "/sql/1.0/warehouses/00adc7b6c00429b8" +else: + HOST = "adb-7064161269814046.2.staging.azuredatabricks.net" + PATH = "/sql/1.0/warehouses/e256699345d1ac74" + +print(f"Python U2M ({TARGET}) — browser will open, complete login.") +t0 = time.time() +try: + with sql.connect( + server_hostname=HOST, + http_path=PATH, + auth_type="databricks-oauth", + ) as conn: + cur = conn.cursor() + cur.execute("SELECT 12345678 AS v") + row = cur.fetchone() + cur.close() + if row and row[0] == 12345678: + print(f"U2M {TARGET}: PASS ({time.time()-t0:.1f}s)") + else: + print(f"U2M {TARGET}: FAIL — unexpected {row!r}") +except Exception as e: + print(f"U2M {TARGET}: FAIL — {type(e).__name__}: {str(e)[:300]}") diff --git a/examples/u2m_debug_trace.txt b/examples/u2m_debug_trace.txt new file mode 100644 index 000000000..627fd6442 --- /dev/null +++ b/examples/u2m_debug_trace.txt @@ -0,0 +1,74 @@ +# U2M Debug Trace — captures exact authorize URL, callback response, token endpoint, everything +# Run ONE AT A TIME + +# ============================================================ +# Trace A: azure-oauth on Staging Legacy (staging scope) — PASSES +# ============================================================ +cd ~/Desktop/databricks-sql-python && DATABRICKS_AZURE_TENANT_ID=4a67d088-db5c-48f1-9ff2-0aace800ae68 PYTHONPATH=src .venv/bin/python -c " +import logging, os, time +logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)-5s %(name)s: %(message)s', + handlers=[logging.FileHandler('/tmp/u2m_trace_legacy.log', mode='w'), logging.StreamHandler()]) +for n in ['urllib3.connectionpool', 'urllib3.response', 'thrift', 'pyarrow']: + logging.getLogger(n).setLevel(logging.WARNING) + +from databricks import sql +print('\n=== Trace A: azure-oauth | Stg Legacy with staging scope ===') +t0 = time.time() +try: + conn = sql.connect(server_hostname='adb-7064161269814046.2.staging.azuredatabricks.net', + http_path='/sql/1.0/warehouses/e256699345d1ac74', auth_type='azure-oauth') + cur = conn.cursor(); cur.execute('SELECT 1'); print('RESULT:', cur.fetchone()[0]); cur.close(); conn.close() + print(f'PASS ({time.time()-t0:.1f}s)') +except Exception as e: + print(f'FAIL ({time.time()-t0:.1f}s)') + print(f' {type(e).__name__}: {e}') +print(f'\nFull log: /tmp/u2m_trace_legacy.log') +" + +# ============================================================ +# Trace B: azure-oauth on Staging SPOG (staging scope) — FAILS +# ============================================================ +cd ~/Desktop/databricks-sql-python && DATABRICKS_AZURE_TENANT_ID=4a67d088-db5c-48f1-9ff2-0aace800ae68 PYTHONPATH=src .venv/bin/python -c " +import logging, os, time +logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)-5s %(name)s: %(message)s', + handlers=[logging.FileHandler('/tmp/u2m_trace_spog.log', mode='w'), logging.StreamHandler()]) +for n in ['urllib3.connectionpool', 'urllib3.response', 'thrift', 'pyarrow']: + logging.getLogger(n).setLevel(logging.WARNING) + +from databricks import sql +print('\n=== Trace B: azure-oauth | Stg SPOG with staging scope ===') +t0 = time.time() +try: + conn = sql.connect(server_hostname='dogfood-spog.staging.azuredatabricks.net', + http_path='/sql/1.0/warehouses/e256699345d1ac74?o=7064161269814046', auth_type='azure-oauth') + cur = conn.cursor(); cur.execute('SELECT 1'); print('RESULT:', cur.fetchone()[0]); cur.close(); conn.close() + print(f'PASS ({time.time()-t0:.1f}s)') +except Exception as e: + print(f'FAIL ({time.time()-t0:.1f}s)') + print(f' {type(e).__name__}: {e}') +print(f'\nFull log: /tmp/u2m_trace_spog.log') +" + +# ============================================================ +# Trace C: databricks-oauth on Staging SPOG — PASSES (for comparison) +# ============================================================ +cd ~/Desktop/databricks-sql-python && PYTHONPATH=src .venv/bin/python -c " +import logging, os, time +logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)-5s %(name)s: %(message)s', + handlers=[logging.FileHandler('/tmp/u2m_trace_spog_inhouse.log', mode='w'), logging.StreamHandler()]) +for n in ['urllib3.connectionpool', 'urllib3.response', 'thrift', 'pyarrow']: + logging.getLogger(n).setLevel(logging.WARNING) + +from databricks import sql +print('\n=== Trace C: databricks-oauth | Stg SPOG (InHouse scopes) ===') +t0 = time.time() +try: + conn = sql.connect(server_hostname='dogfood-spog.staging.azuredatabricks.net', + http_path='/sql/1.0/warehouses/e256699345d1ac74?o=7064161269814046', auth_type='databricks-oauth') + cur = conn.cursor(); cur.execute('SELECT 1'); print('RESULT:', cur.fetchone()[0]); cur.close(); conn.close() + print(f'PASS ({time.time()-t0:.1f}s)') +except Exception as e: + print(f'FAIL ({time.time()-t0:.1f}s)') + print(f' {type(e).__name__}: {e}') +print(f'\nFull log: /tmp/u2m_trace_spog_inhouse.log') +" diff --git a/examples/u2m_interactive_tests.txt b/examples/u2m_interactive_tests.txt new file mode 100644 index 000000000..83e4dc969 --- /dev/null +++ b/examples/u2m_interactive_tests.txt @@ -0,0 +1,120 @@ +# Python SQL Connector — U2M Interactive Tests +# Run each test ONE AT A TIME from your terminal (not background). +# Each opens a browser window — login and wait for result. +# +# All 4 tests use the connector's NATIVE U2M (no SDK). +# Code path: auth.py:31-44 → DatabricksOAuthProvider → authenticators.py:56-148 +# +# cd ~/Desktop/databricks-sql-python before running. + +# ============================================================ +# Test 1: databricks-oauth on Legacy +# Code: InHouseOAuthEndpointCollection → scopes: sql offline_access +# Authorize: {host}/oidc/oauth2/v2.0/authorize +# Expected: PASS +# ============================================================ +cd ~/Desktop/databricks-sql-python && PYTHONPATH=src .venv/bin/python -c " +from databricks import sql +print('Test 1: databricks-oauth | Legacy') +conn = sql.connect(server_hostname='adb-7064161269814046.2.staging.azuredatabricks.net', http_path='/sql/1.0/warehouses/e256699345d1ac74', auth_type='databricks-oauth') +cur = conn.cursor(); cur.execute('SELECT 1'); print('RESULT:', cur.fetchone()[0]); cur.close(); conn.close() +print('PASS') +" + +# ============================================================ +# Test 2: databricks-oauth on SPOG +# Code: InHouseOAuthEndpointCollection → scopes: sql offline_access +# Authorize: {spog-host}/oidc/oauth2/v2.0/authorize +# Expected: PASS or FAIL (server-side SPOG issue) +# ============================================================ +cd ~/Desktop/databricks-sql-python && PYTHONPATH=src .venv/bin/python -c " +from databricks import sql +print('Test 2: databricks-oauth | SPOG') +conn = sql.connect(server_hostname='dogfood-spog.staging.azuredatabricks.net', http_path='/sql/1.0/warehouses/e256699345d1ac74?o=7064161269814046', auth_type='databricks-oauth') +cur = conn.cursor(); cur.execute('SELECT 1'); print('RESULT:', cur.fetchone()[0]); cur.close(); conn.close() +print('PASS') +" + +# ============================================================ +# Test 3: azure-oauth on Legacy +# Code: AzureOAuthEndpointCollection → scopes: 2ff814a6.../user_impersonation offline_access +# Authorize: {host}/oidc/oauth2/v2.0/authorize (with Azure client_id 96eecda7) +# Expected: PASS or FAIL +# ============================================================ +cd ~/Desktop/databricks-sql-python && PYTHONPATH=src .venv/bin/python -c " +from databricks import sql +print('Test 3: azure-oauth | Legacy') +conn = sql.connect(server_hostname='adb-7064161269814046.2.staging.azuredatabricks.net', http_path='/sql/1.0/warehouses/e256699345d1ac74', auth_type='azure-oauth') +cur = conn.cursor(); cur.execute('SELECT 1'); print('RESULT:', cur.fetchone()[0]); cur.close(); conn.close() +print('PASS') +" + +# ============================================================ +# Test 4: azure-oauth on SPOG +# Code: AzureOAuthEndpointCollection → scopes: 2ff814a6.../user_impersonation offline_access +# Authorize: {spog-host}/oidc/oauth2/v2.0/authorize (with Azure client_id 96eecda7) +# Expected: PASS or FAIL +# ============================================================ +cd ~/Desktop/databricks-sql-python && PYTHONPATH=src .venv/bin/python -c " +from databricks import sql +print('Test 4: azure-oauth | SPOG') +conn = sql.connect(server_hostname='dogfood-spog.staging.azuredatabricks.net', http_path='/sql/1.0/warehouses/e256699345d1ac74?o=7064161269814046', auth_type='azure-oauth') +cur = conn.cursor(); cur.execute('SELECT 1'); print('RESULT:', cur.fetchone()[0]); cur.close(); conn.close() +print('PASS') +" + +# ============================================================ +# Test 5: No auth params (default — falls through to databricks-oauth) +# Code: auth.py:51-64 → same as databricks-oauth +# Expected: PASS (same as Test 1) +# ============================================================ +cd ~/Desktop/databricks-sql-python && PYTHONPATH=src .venv/bin/python -c " +from databricks import sql +print('Test 5: default (no auth) | Legacy') +conn = sql.connect(server_hostname='adb-7064161269814046.2.staging.azuredatabricks.net', http_path='/sql/1.0/warehouses/e256699345d1ac74') +cur = conn.cursor(); cur.execute('SELECT 1'); print('RESULT:', cur.fetchone()[0]); cur.close(); conn.close() +print('PASS') +" + +# ============================================================ +# Test 6: azure-oauth on SPOG with InHouse client_id override +# Purpose: Isolate whether SPOG azure-oauth stuck is caused by +# the Azure client_id (96eecda7) or the Azure scopes. +# Uses azure-oauth (Azure scopes 4a67d088.../user_impersonation) +# but overrides client_id to databricks-sql-python (which worked in Test 2). +# If PASS → problem is client_id 96eecda7 on SPOG +# If STUCK → problem is Azure scopes on SPOG +# ============================================================ +cd ~/Desktop/databricks-sql-python && DATABRICKS_AZURE_TENANT_ID=4a67d088-db5c-48f1-9ff2-0aace800ae68 PYTHONPATH=src .venv/bin/python -c " +from databricks import sql +print('Test 6: azure-oauth | SPOG with client_id=databricks-sql-python + staging scope') +conn = sql.connect(server_hostname='dogfood-spog.staging.azuredatabricks.net', http_path='/sql/1.0/warehouses/e256699345d1ac74?o=7064161269814046', auth_type='azure-oauth', oauth_client_id='databricks-sql-python') +cur = conn.cursor(); cur.execute('SELECT 1'); print('RESULT:', cur.fetchone()[0]); cur.close(); conn.close() +print('PASS') +" + +# ============================================================ +# Test 7: azure-oauth on Staging Legacy with staging scope override +# Purpose: Verify staging azure-oauth works with correct scope +# Expected: PASS +# ============================================================ +cd ~/Desktop/databricks-sql-python && DATABRICKS_AZURE_TENANT_ID=4a67d088-db5c-48f1-9ff2-0aace800ae68 PYTHONPATH=src .venv/bin/python -c " +from databricks import sql +print('Test 7: azure-oauth | Staging Legacy with staging scope') +conn = sql.connect(server_hostname='adb-7064161269814046.2.staging.azuredatabricks.net', http_path='/sql/1.0/warehouses/e256699345d1ac74', auth_type='azure-oauth') +cur = conn.cursor(); cur.execute('SELECT 1'); print('RESULT:', cur.fetchone()[0]); cur.close(); conn.close() +print('PASS') +" + +# ============================================================ +# Test 8: azure-oauth on Prod Legacy (default scope 2ff814a6) +# Purpose: Verify prod azure-oauth works with default scope +# Expected: PASS +# ============================================================ +cd ~/Desktop/databricks-sql-python && PYTHONPATH=src .venv/bin/python -c " +from databricks import sql +print('Test 8: azure-oauth | Prod Legacy') +conn = sql.connect(server_hostname='adb-6436897454825492.12.azuredatabricks.net', http_path='/sql/1.0/warehouses/00adc7b6c00429b8', auth_type='azure-oauth') +cur = conn.cursor(); cur.execute('SELECT 1'); print('RESULT:', cur.fetchone()[0]); cur.close(); conn.close() +print('PASS') +" diff --git a/examples/u2m_prod_spog_test.txt b/examples/u2m_prod_spog_test.txt new file mode 100644 index 000000000..ae8c77fb3 --- /dev/null +++ b/examples/u2m_prod_spog_test.txt @@ -0,0 +1,40 @@ +# U2M Tests on Prod SPOG (peco.azuredatabricks.net) +# Run each ONE AT A TIME from terminal. + +# ============================================================ +# Test 9: databricks-oauth on Prod SPOG +# Scopes: sql offline_access (InHouse) +# Expected: ? (prod SPOG flag may not be enabled) +# ============================================================ +cd ~/Desktop/databricks-sql-python && PYTHONPATH=src .venv/bin/python -c " +from databricks import sql +print('Test 9: databricks-oauth | Prod SPOG') +conn = sql.connect(server_hostname='peco.azuredatabricks.net', http_path='/sql/1.0/warehouses/00adc7b6c00429b8?o=6436897454825492', auth_type='databricks-oauth') +cur = conn.cursor(); cur.execute('SELECT 1'); print('RESULT:', cur.fetchone()[0]); cur.close(); conn.close() +print('PASS') +" + +# ============================================================ +# Test 10: azure-oauth on Prod SPOG (default scope 2ff814a6) +# Scopes: 2ff814a6.../user_impersonation offline_access +# Expected: STUCK or FAIL (server-side SPOG issue) +# ============================================================ +cd ~/Desktop/databricks-sql-python && PYTHONPATH=src .venv/bin/python -c " +from databricks import sql +print('Test 10: azure-oauth | Prod SPOG') +conn = sql.connect(server_hostname='peco.azuredatabricks.net', http_path='/sql/1.0/warehouses/00adc7b6c00429b8?o=6436897454825492', auth_type='azure-oauth') +cur = conn.cursor(); cur.execute('SELECT 1'); print('RESULT:', cur.fetchone()[0]); cur.close(); conn.close() +print('PASS') +" + +# ============================================================ +# Test 11: default (no auth) on Prod SPOG +# Falls through to databricks-oauth +# ============================================================ +cd ~/Desktop/databricks-sql-python && PYTHONPATH=src .venv/bin/python -c " +from databricks import sql +print('Test 11: default (no auth) | Prod SPOG') +conn = sql.connect(server_hostname='peco.azuredatabricks.net', http_path='/sql/1.0/warehouses/00adc7b6c00429b8?o=6436897454825492') +cur = conn.cursor(); cur.execute('SELECT 1'); print('RESULT:', cur.fetchone()[0]); cur.close(); conn.close() +print('PASS') +" diff --git a/examples/verify_spog_thrift_accepts_account_token.py b/examples/verify_spog_thrift_accepts_account_token.py new file mode 100644 index 000000000..bb4349a73 --- /dev/null +++ b/examples/verify_spog_thrift_accepts_account_token.py @@ -0,0 +1,95 @@ +"""Isolate server vs connector: get an SDK-issued M2M token on Stg SPOG, +then POST directly to the Thrift endpoint with it — skip pysql's TokenFederationProvider. + +If Thrift returns 200 → server DOES accept the accounts-shaped iss → the +previous failure was the pysql connector's fault (TokenFederationProvider trying to +exchange unnecessarily) — not the server's. + +If Thrift returns 4xx → server rejects the accounts-shaped iss → SPOG team's +claim that it works is inconsistent with what we observe on this workspace. +""" +import base64 +import json +import os +import struct +import sys + +import requests +from databricks.sdk.core import Config, oauth_service_principal + +CLIENT_ID = os.getenv("DATABRICKS_DOGFOOD_AZURE_CLIENT_ID") +CLIENT_SECRET = os.getenv("DATABRICKS_DOGFOOD_AZURE_CLIENT_SECRET") + +HOST = "dogfood-spog.staging.azuredatabricks.net" +HTTP_PATH = "/sql/1.0/warehouses/e256699345d1ac74?o=7064161269814046" + + +def decode_jwt(token): + parts = token.split(".") + pl = parts[1] + "=" * (-len(parts[1]) % 4) + return json.loads(base64.urlsafe_b64decode(pl)) + + +# 1. Get an M2M token via SDK +cfg = Config(host=f"https://{HOST}", client_id=CLIENT_ID, client_secret=CLIENT_SECRET) +provider = oauth_service_principal(cfg) +headers = provider() +token = headers["Authorization"].split(" ", 1)[1] + +claims = decode_jwt(token) +print("Token claims:") +print(f" iss: {claims.get('iss')}") +print(f" aud: {claims.get('aud')}") +print(f" sub: {claims.get('sub')}") +print(f" scope: {claims.get('scope')}") +print(f" exp: {claims.get('exp')}") +print() + +# 2. Make a minimal Thrift HiveServer2 OpenSession request, directly +# (no pysql TokenFederationProvider in the way) +# +# Thrift-over-HTTP uses the TBinaryProtocol framed messages. We can use +# the generated TCLIService Python stubs that come with pysql. +sys.path.insert(0, "src") +from databricks.sql.thrift_api.TCLIService import TCLIService, ttypes +from thrift.protocol.TBinaryProtocol import TBinaryProtocol +from thrift.transport.THttpClient import THttpClient + +url = f"https://{HOST}{HTTP_PATH}" +print(f"POST target: {url}") +print() + +transport = THttpClient(url) +transport.setCustomHeaders({ + "Authorization": f"Bearer {token}", + "X-Databricks-Thrift-Version": "0", +}) +protocol = TBinaryProtocol(transport) +client = TCLIService.Client(protocol) + +req = ttypes.TOpenSessionReq( + client_protocol=ttypes.TProtocolVersion.SPARK_CLI_SERVICE_PROTOCOL_V1, + username=None, +) + +print("Calling TCLIService.OpenSession directly over HTTPS...") +try: + resp = client.OpenSession(req) + print() + print(f"status: {resp.status.statusCode}") + print(f"errorMessage: {resp.status.errorMessage}") + if resp.status.statusCode in (ttypes.TStatusCode.SUCCESS_STATUS, ttypes.TStatusCode.SUCCESS_WITH_INFO_STATUS): + print(">>> Thrift DID accept the accounts-shaped iss token. Server is fine.") + print(">>> The earlier failure was inside pysql's TokenFederationProvider.") + else: + print(">>> Thrift returned non-success status. See errorMessage + infoMessages.") +except Exception as e: + print() + print(f"Exception: {type(e).__name__}: {e}") + print(">>> Server likely returned an HTTP error before Thrift got to respond.") + # Inspect transport for last response code + try: + print(f"HTTP status: {transport.code}") + print(f"HTTP headers: {dict(transport.headers) if transport.headers else {}}") + except Exception: + pass diff --git a/src/databricks/sql/backend/sea/backend.py b/src/databricks/sql/backend/sea/backend.py index ff130cd39..04c79a18b 100644 --- a/src/databricks/sql/backend/sea/backend.py +++ b/src/databricks/sql/backend/sea/backend.py @@ -188,8 +188,9 @@ def _extract_warehouse_id(self, http_path: str) -> str: ValueError: If the warehouse ID cannot be extracted from the path """ - warehouse_pattern = re.compile(r".*/warehouses/(.+)") - endpoint_pattern = re.compile(r".*/endpoints/(.+)") + # [^?&]+ stops at query params (e.g. ?o= for SPOG routing) + warehouse_pattern = re.compile(r".*/warehouses/([^?&]+)") + endpoint_pattern = re.compile(r".*/endpoints/([^?&]+)") for pattern in [warehouse_pattern, endpoint_pattern]: match = pattern.match(http_path) diff --git a/src/databricks/sql/client.py b/src/databricks/sql/client.py index 2aeea175e..fe52f0c79 100755 --- a/src/databricks/sql/client.py +++ b/src/databricks/sql/client.py @@ -353,6 +353,7 @@ def read(self) -> Optional[OAuthToken]: host_url=self.session.host, batch_size=self.telemetry_batch_size, client_context=client_context, + extra_headers=self.session.get_spog_headers(), ) self._telemetry_client = TelemetryClientFactory.get_telemetry_client( diff --git a/src/databricks/sql/common/feature_flag.py b/src/databricks/sql/common/feature_flag.py index 36e4b8a02..0b2c7490b 100644 --- a/src/databricks/sql/common/feature_flag.py +++ b/src/databricks/sql/common/feature_flag.py @@ -113,6 +113,7 @@ def _refresh_flags(self): # Authenticate the request self._connection.session.auth_provider.add_headers(headers) headers["User-Agent"] = self._connection.session.useragent_header + headers.update(self._connection.session.get_spog_headers()) response = self._http_client.request( HttpMethod.GET, self._feature_flag_endpoint, headers=headers, timeout=30 diff --git a/src/databricks/sql/session.py b/src/databricks/sql/session.py index 1588d9f79..65c0d6aca 100644 --- a/src/databricks/sql/session.py +++ b/src/databricks/sql/session.py @@ -72,6 +72,14 @@ def __init__( base_headers = [("User-Agent", self.useragent_header)] all_headers = (http_headers or []) + base_headers + # Extract ?o= from http_path for SPOG routing. + # On SPOG hosts, the httpPath contains ?o= which routes Thrift + # requests via the URL. For SEA, telemetry, and feature flags (which use + # separate endpoints), we inject x-databricks-org-id as an HTTP header. + self._spog_headers = self._extract_spog_headers(http_path, all_headers) + if self._spog_headers: + all_headers = all_headers + list(self._spog_headers.items()) + self.ssl_options = SSLOptions( # Double negation is generally a bad thing, but we have to keep backward compatibility tls_verify=not kwargs.get( @@ -136,6 +144,44 @@ def _create_backend( } return databricks_client_class(**common_args) + @staticmethod + def _extract_spog_headers(http_path, existing_headers): + """Extract ?o= from http_path and return as a header dict for SPOG routing.""" + if not http_path or "?" not in http_path: + return {} + + from urllib.parse import parse_qs + + query_string = http_path.split("?", 1)[1] + params = parse_qs(query_string) + org_id = params.get("o", [None])[0] + if not org_id: + logger.debug( + "SPOG header extraction: http_path has query string but no ?o= param, " + "skipping x-databricks-org-id injection" + ) + return {} + + # Don't override if explicitly set + if any(k == "x-databricks-org-id" for k, _ in existing_headers): + logger.debug( + "SPOG header extraction: x-databricks-org-id already set by caller, " + "not overriding with ?o=%s from http_path", + org_id, + ) + return {} + + logger.debug( + "SPOG header extraction: injecting x-databricks-org-id=%s " + "(extracted from ?o= in http_path)", + org_id, + ) + return {"x-databricks-org-id": org_id} + + def get_spog_headers(self): + """Returns SPOG routing headers (x-databricks-org-id) if ?o= was in http_path.""" + return dict(self._spog_headers) + def open(self): self._session_id = self.backend.open_session( session_configuration=self.session_configuration, diff --git a/src/databricks/sql/telemetry/telemetry_client.py b/src/databricks/sql/telemetry/telemetry_client.py index 408162400..55d845e46 100644 --- a/src/databricks/sql/telemetry/telemetry_client.py +++ b/src/databricks/sql/telemetry/telemetry_client.py @@ -188,6 +188,7 @@ def __init__( executor, batch_size: int, client_context, + extra_headers: Optional[Dict[str, str]] = None, ) -> None: logger.debug("Initializing TelemetryClient for connection: %s", session_id_hex) self._telemetry_enabled = telemetry_enabled @@ -195,6 +196,7 @@ def __init__( self._session_id_hex = session_id_hex self._auth_provider = auth_provider self._user_agent = None + self._extra_headers = extra_headers or {} # OPTIMIZATION: Use lock-free Queue instead of list + lock # Queue is thread-safe internally and has better performance under concurrency @@ -287,6 +289,8 @@ def _send_telemetry(self, events): if self._auth_provider: self._auth_provider.add_headers(headers) + headers.update(self._extra_headers) + try: logger.debug("Submitting telemetry request to thread pool") @@ -587,6 +591,7 @@ def initialize_telemetry_client( host_url, batch_size, client_context, + extra_headers=None, ): """ Initialize a telemetry client for a specific connection if telemetry is enabled. @@ -627,6 +632,7 @@ def initialize_telemetry_client( executor=TelemetryClientFactory._executor, batch_size=batch_size, client_context=client_context, + extra_headers=extra_headers, ) TelemetryClientFactory._clients[ host_url diff --git a/tests/unit/test_sea_backend.py b/tests/unit/test_sea_backend.py index f71e60943..24a5e8242 100644 --- a/tests/unit/test_sea_backend.py +++ b/tests/unit/test_sea_backend.py @@ -143,6 +143,39 @@ def test_initialization(self, mock_http_client): ) assert client2.warehouse_id == "def456" + # Test with SPOG query param ?o= in http_path + client_spog = SeaDatabricksClient( + server_hostname="test-server.databricks.com", + port=443, + http_path="/sql/1.0/warehouses/abc123?o=6051921418418893", + http_headers=[], + auth_provider=AuthProvider(), + ssl_options=SSLOptions(), + ) + assert client_spog.warehouse_id == "abc123" + + # Test with SPOG query param on endpoints path + client_spog_ep = SeaDatabricksClient( + server_hostname="test-server.databricks.com", + port=443, + http_path="/sql/1.0/endpoints/def456?o=6051921418418893", + http_headers=[], + auth_provider=AuthProvider(), + ssl_options=SSLOptions(), + ) + assert client_spog_ep.warehouse_id == "def456" + + # Test with multiple query params + client_spog_multi = SeaDatabricksClient( + server_hostname="test-server.databricks.com", + port=443, + http_path="/sql/1.0/warehouses/abc123?o=123&extra=val", + http_headers=[], + auth_provider=AuthProvider(), + ssl_options=SSLOptions(), + ) + assert client_spog_multi.warehouse_id == "abc123" + # Test with custom max_download_threads client3 = SeaDatabricksClient( server_hostname="test-server.databricks.com", diff --git a/tests/unit/test_session.py b/tests/unit/test_session.py index aa7e7f02b..136c99e53 100644 --- a/tests/unit/test_session.py +++ b/tests/unit/test_session.py @@ -8,6 +8,7 @@ THandleIdentifier, ) from databricks.sql.backend.types import SessionId, BackendType +from databricks.sql.session import Session import databricks.sql @@ -226,3 +227,46 @@ def test_query_tags_dict_takes_precedence_over_session_config(self, mock_client_ call_kwargs = mock_client_class.return_value.open_session.call_args[1] assert call_kwargs["session_configuration"]["QUERY_TAGS"] == "team:new-team" + + +class TestSpogHeaders: + """Unit tests for SPOG header extraction from http_path.""" + + def test_extracts_org_id_from_query_param(self): + result = Session._extract_spog_headers( + "/sql/1.0/warehouses/abc123?o=6051921418418893", [] + ) + assert result == {"x-databricks-org-id": "6051921418418893"} + + def test_no_query_param_returns_empty(self): + result = Session._extract_spog_headers( + "/sql/1.0/warehouses/abc123", [] + ) + assert result == {} + + def test_no_o_param_returns_empty(self): + result = Session._extract_spog_headers( + "/sql/1.0/warehouses/abc123?other=value", [] + ) + assert result == {} + + def test_empty_http_path_returns_empty(self): + result = Session._extract_spog_headers("", []) + assert result == {} + + def test_none_http_path_returns_empty(self): + result = Session._extract_spog_headers(None, []) + assert result == {} + + def test_explicit_header_takes_precedence(self): + existing = [("x-databricks-org-id", "explicit-value")] + result = Session._extract_spog_headers( + "/sql/1.0/warehouses/abc123?o=6051921418418893", existing + ) + assert result == {} + + def test_multiple_query_params(self): + result = Session._extract_spog_headers( + "/sql/1.0/warehouses/abc123?o=12345&extra=val", [] + ) + assert result == {"x-databricks-org-id": "12345"}