/*--------------------------------------------------------------------------------------------- * Copyright (c) Microsoft Corporation. All rights reserved. *--------------------------------------------------------------------------------------------*/ /** * Python code generator for session-events and RPC types. */ import fs from "fs/promises"; import path from "path"; import type { JSONSchema7, JSONSchema7Definition } from "json-schema"; import { fileURLToPath } from "url"; import { cloneSchemaForCodegen, filterNodeByVisibility, fixNullableRequiredRefsInApiSchema, getApiSchemaPath, getRpcSchemaTypeName, getSessionEventsSchemaPath, isObjectSchema, isOpaqueJson, isVoidSchema, getNullableInner, isRpcMethod, isNodeFullyExperimental, isNodeFullyDeprecated, isSchemaDeprecated, isSchemaExperimental, isSchemaInternal, postProcessSchema, propagateInternalVisibility, collectInternalSymbols, collectInternalFieldsOnPublicTypes, annotateInternalPythonFields, renameInternalPythonSymbols, stripBooleanLiterals, writeGeneratedFile, collectDefinitionCollections, collectExperimentalOnlyRpcReferencedDefinitionNames, collectReachableDefinitionNames, collectRpcMethodReferencedDefinitionNames, findSharedSchemaDefinitions, hasSchemaPayload, parseExternalSchemaRef, refTypeName, resolveObjectSchema, resolveSchema, rewriteSharedDefinitionReferences, withSharedDefinitions, getSessionEventVariantSchemas, getSharedSessionEventEnvelopeProperties, getEnumValueDescriptions, loadSchemaJson, fixBrandCasing, type ApiSchema, type DefinitionCollections, type EnumValueDescriptions, type RpcMethod, type SessionEventEnvelopeProperty, } from "./utils.js"; // ── Utilities ─────────────────────────────────────────────────────────────── const EXTERNAL_SCHEMA_PY_MODULE: Record = { "session-events.schema.json": ".session_events", }; type PyExperimentalSubject = "type" | "enum" | "event"; function pyExperimentalComment(subject: PyExperimentalSubject, indent = ""): string { return `${indent}# Experimental: this ${subject} is part of an experimental API and may change or be removed.`; } function rewriteExternalRefsForPython(schema: JSONSchema7 & { definitions?: Record }): { placeholderNames: Map; imports: Map>; } { const placeholderNames = new Map(); const imports = new Map>(); const placeholderFor = (typeName: string): string => `__ExternalRef_${typeName}`; const visit = (value: unknown): void => { if (Array.isArray(value)) { for (const item of value) visit(item); return; } if (!value || typeof value !== "object") return; const node = value as Record; if (typeof node.$ref === "string" && !node.$ref.startsWith("#")) { const externalRef = parseExternalSchemaRef(node.$ref); const module = externalRef ? EXTERNAL_SCHEMA_PY_MODULE[externalRef.schemaFile] : undefined; if (externalRef && module) { const placeholder = placeholderFor(externalRef.definitionName); placeholderNames.set(placeholder, externalRef.definitionName); let bucket = imports.get(module); if (!bucket) { bucket = new Set(); imports.set(module, bucket); } bucket.add(externalRef.definitionName); node.$ref = `#/definitions/${placeholder}`; } } for (const child of Object.values(node)) visit(child); }; visit(schema); if (placeholderNames.size > 0) { if (!schema.definitions) schema.definitions = {}; for (const placeholder of placeholderNames.keys()) { if (!schema.definitions[placeholder]) { const markerProperty = `__externalRefMarker_${placeholder}`; schema.definitions[placeholder] = { type: "object", additionalProperties: false, title: placeholder, properties: { [markerProperty]: { type: "string" }, }, required: [markerProperty], }; } } } return { placeholderNames, imports }; } function placeholderToQuicktypeIdentifier(placeholder: string): string { return placeholder .replace(/^_+/, "") .split("_") .map((segment) => (segment ? segment[0].toUpperCase() + segment.slice(1) : "")) .join(""); } function placeholderToQuicktypeIdentifiers(placeholder: string): string[] { const basic = placeholderToQuicktypeIdentifier(placeholder); return [...new Set([basic, basic.replace(/Mcp/g, "MCP")])]; } function postProcessExternalRefsForPython( code: string, placeholderToReal: Map, externalEnumNames: Set = new Set() ): string { for (const [placeholder, realName] of placeholderToReal) { for (const quicktypeName of placeholderToQuicktypeIdentifiers(placeholder)) { code = code.replace( new RegExp( `(?:^|\\n)@dataclass\\r?\\nclass ${quicktypeName}\\b[\\s\\S]*?(?=\\n@dataclass\\b|\\nclass\\s+\\w|\\ndef\\s+\\w|$)`, "g" ), "\n" ); code = code.replace( new RegExp( `(?:^|\\n)class ${quicktypeName}\\w*\\(Enum\\):[\\s\\S]*?(?=\\nclass\\s+\\w|\\n@dataclass\\b|\\ndef\\s+\\w|$)`, "g" ), "\n" ); code = code.replace(new RegExp(`\\b${quicktypeName}\\b`, "g"), realName); } if (externalEnumNames.has(realName)) { code = code.replace(new RegExp(`\\b${realName}\\.from_dict\\b`, "g"), realName); code = code.replace( new RegExp(`to_class\\(${realName},\\s*([^)]+)\\)`, "g"), `to_enum(${realName}, $1)` ); } } return code.replace(/\n{3,}/g, "\n\n"); } function collectPythonExternalEnumNames( schema: JSONSchema7 | undefined, placeholderToReal: Map ): Set { const enumNames = new Set(); if (!schema) return enumNames; const definitions = collectDefinitionCollections(schema as Record); for (const realName of placeholderToReal.values()) { const definition = definitions.definitions[realName] ?? definitions.$defs[realName]; const resolved = definition ? resolveSchema(definition, definitions) ?? definition : undefined; if ( resolved?.enum && Array.isArray(resolved.enum) && resolved.enum.every((value) => typeof value === "string") ) { enumNames.add(realName); } } return enumNames; } function preservePythonRpcStringDateFields(definitions: Record): void { const quotaSnapshot = definitions.AccountQuotaSnapshot; const resetDate = quotaSnapshot?.properties?.resetDate as JSONSchema7 | undefined; if (resetDate?.type === "string" && resetDate.format === "date-time") { // Keep the existing Python API shape: AccountQuotaSnapshot.reset_date is an ISO string. delete resetDate.format; } } function collectExternalUnionAliasesForPython( definitions: Record, placeholderToReal: Map ): Map { const aliases = new Map(); for (const [definitionName, definition] of Object.entries(definitions)) { const variants = definition.anyOf ?? definition.oneOf; if (!Array.isArray(variants)) continue; const realNames: string[] = []; let allExternal = true; for (const variant of variants) { if (!variant || typeof variant !== "object") { allExternal = false; break; } const ref = (variant as JSONSchema7).$ref; if (!ref?.startsWith("#/definitions/")) { allExternal = false; break; } const placeholder = ref.slice("#/definitions/".length); const realName = placeholderToReal.get(placeholder); if (!realName) { allExternal = false; break; } realNames.push(realName); } if (allExternal && realNames.length > 0) { aliases.set(definitionName, realNames); } } return aliases; } function postProcessExternalUnionAliasesForPython(code: string, aliases: Map): string { for (const [aliasName, realNames] of aliases) { const aliasLine = `${aliasName} = ${realNames.join(" | ")}`; const classPattern = new RegExp( `(?:^|\\n)@dataclass\\r?\\nclass ${aliasName}\\b[\\s\\S]*?(?=\\n@dataclass\\b|\\nclass\\s+\\w|\\ndef\\s+\\w|$)`, "g" ); if (classPattern.test(code)) { code = code.replace(classPattern, `\n${aliasLine}\n`); } else if (!new RegExp(`^${aliasName}\\s*=`, "m").test(code)) { code = `${aliasLine}\n\n${code}`; } code = code.replace( new RegExp(`${aliasName}\\.from_dict`, "g"), `(lambda x: from_union([${realNames.map((name) => `${name}.from_dict`).join(", ")}], x))` ); code = code.replace( new RegExp(`to_class\\(${aliasName},\\s*([^)]+)\\)`, "g"), `from_union([${realNames.map((name) => `lambda x: to_class(${name}, x)`).join(", ")}], $1)` ); } return code.replace(/\n{3,}/g, "\n\n"); } /** * Replace flat-merged dataclasses emitted by quicktype for $ref-based * discriminated unions with proper Python unions: a `Name = VariantA | ...` * alias plus a `_load_Name(obj)` dispatcher. Rewrites `Name.from_dict(x)` and * `to_class(Name, x)` references to use the dispatcher / per-variant * `.to_dict()` so callers transparently get the proper union shape. * * Detection: walk top-level definitions and pick those whose schema is an * `anyOf`/`oneOf` of `$ref`s with a shared `const` discriminator. For each * such definition we expect quicktype to already have emitted both the * merged blob (which we'll delete) and the per-variant classes (which we * keep). * * Returns the rewritten types-section code and the list of resolved union * names; callers can re-apply `applyUnionRewritesToPython` to subsequently * generated code (e.g. RPC method wrappers) so `Name.from_dict(x)` calls * there also route through the new dispatcher. */ interface ResolvedRefBasedUnion { aliasName: string; discriminatorProp: string; dispatch: Array<{ value: string; typeName: string }>; } function postProcessRefBasedDiscriminatedUnionsForPython( code: string, definitions: Record, definitionCollections: DefinitionCollections ): { code: string; unions: ResolvedRefBasedUnion[] } { interface UnionInfo { aliasName: string; variantNames: string[]; discriminatorProp: string; dispatch: Array<{ value: string; typeName: string }>; description: string | undefined; } const unions: UnionInfo[] = []; for (const [defName, definition] of Object.entries(definitions)) { const variants = (definition.anyOf ?? definition.oneOf) as JSONSchema7[] | undefined; if (!Array.isArray(variants) || variants.length < 2) continue; if (!variants.every((v) => typeof v === "object" && v !== null && typeof v.$ref === "string")) { continue; } const variantRefNames = variants.map((v) => refTypeName(v.$ref as string, definitionCollections)); const resolvedVariants = variants.map( (v) => resolveObjectSchema(v, definitionCollections) ?? resolveSchema(v, definitionCollections) ?? v ); if (resolvedVariants.some((rv) => !rv || rv.properties === undefined)) continue; const discriminator = findPyDiscriminator(resolvedVariants as JSONSchema7[]); if (!discriminator) continue; const aliasName = toPascalCase(defName); const dispatch = variants.map((_, i) => { const discProp = (resolvedVariants[i].properties as Record)[ discriminator.property ]; return { value: String(discProp.const), typeName: toPascalCase(variantRefNames[i]), }; }); unions.push({ aliasName, variantNames: variantRefNames.map(toPascalCase), discriminatorProp: discriminator.property, dispatch, description: typeof definition.description === "string" ? definition.description : undefined, }); } const resolved: ResolvedRefBasedUnion[] = []; if (unions.length === 0) return { code, unions: resolved }; const emittedClassNames = new Set(); for (const match of code.matchAll(/^class (\w+)[:\(]/gm)) { emittedClassNames.add(match[1]); } const acronymCandidates = (name: string): string[] => { const substitutions: Array<[RegExp, string]> = [ [/Api/g, "API"], [/Mcp/g, "MCP"], [/Url/g, "URL"], [/Json/g, "JSON"], [/Http/g, "HTTP"], [/Hmac/g, "HMAC"], [/Tcp/g, "TCP"], [/Sql/g, "SQL"], [/Id\b/g, "ID"], [/Llm/g, "LLM"], [/Cli/g, "CLI"], ]; const results = new Set([name]); for (const [pattern, replacement] of substitutions) { for (const existing of [...results]) { results.add(existing.replace(pattern, replacement)); } } return [...results]; }; const resolveActualName = (expected: string): string | undefined => { for (const candidate of acronymCandidates(expected)) { if (emittedClassNames.has(candidate)) return candidate; } return undefined; }; for (const union of unions) { const actualAliasName = resolveActualName(union.aliasName); const actualVariantNames: string[] = []; const actualDispatch: Array<{ value: string; typeName: string }> = []; let allResolved = true; for (let i = 0; i < union.variantNames.length; i++) { const actual = resolveActualName(union.variantNames[i]); if (!actual) { allResolved = false; break; } actualVariantNames.push(actual); actualDispatch.push({ value: union.dispatch[i].value, typeName: actual }); } if (!allResolved || !actualAliasName) { continue; } resolved.push({ aliasName: actualAliasName, discriminatorProp: union.discriminatorProp, dispatch: actualDispatch, }); const lines = code.split("\n"); let classStart = -1; for (let i = 0; i < lines.length; i++) { if (lines[i] === `class ${actualAliasName}:` || lines[i].startsWith(`class ${actualAliasName}(`)) { classStart = i; break; } } if (classStart >= 0) { let blockStart = classStart; while ( blockStart > 0 && (lines[blockStart - 1] === "@dataclass" || /^# /.test(lines[blockStart - 1])) ) { blockStart--; } let blockEnd = classStart + 1; while (blockEnd < lines.length) { const ln = lines[blockEnd]; if ( /^class \w/.test(ln) || /^def \w/.test(ln) || ln === "@dataclass" || /^# (?:Experimental|Deprecated|Internal):/.test(ln) ) { break; } blockEnd++; } lines.splice(blockStart, blockEnd - blockStart); code = lines.join("\n"); } const aliasLine = union.description ? `# ${union.description.replace(/\n/g, " ")}\n${actualAliasName} = ${actualVariantNames.join(" | ")}` : `${actualAliasName} = ${actualVariantNames.join(" | ")}`; const dispatcherLines: string[] = []; dispatcherLines.push(`def _load_${actualAliasName}(obj: Any) -> "${actualAliasName}":`); dispatcherLines.push(` assert isinstance(obj, dict)`); dispatcherLines.push(` kind = obj.get(${JSON.stringify(union.discriminatorProp)})`); dispatcherLines.push(` match kind:`); for (const m of actualDispatch) { dispatcherLines.push(` case ${JSON.stringify(m.value)}: return ${m.typeName}.from_dict(obj)`); } dispatcherLines.push( ` case _: raise ValueError(f"Unknown ${actualAliasName} ${union.discriminatorProp}: {kind!r}")` ); code = `${code.trimEnd()}\n\n\n${aliasLine}\n\n\n${dispatcherLines.join("\n")}\n`; } code = applyUnionRewritesToPython(code, resolved); return { code, unions: resolved }; } /** * Rewrite occurrences of `Name.from_dict(...)` to `_load_Name(...)` and * `to_class(Name, x)` to `(x).to_dict()` for each union the caller passes in. * Safe to apply repeatedly — re-running on already-rewritten code is a no-op. */ function applyUnionRewritesToPython(code: string, unions: ResolvedRefBasedUnion[]): string { for (const union of unions) { code = code.replace( new RegExp(`\\b${union.aliasName}\\.from_dict\\b`, "g"), `_load_${union.aliasName}` ); code = code.replace( new RegExp(`to_class\\(${union.aliasName},\\s*([^,)]+)\\)`, "g"), `($1).to_dict()` ); } return code; } /** * For each discriminated-union variant class, replace the dataclass-level * discriminator field (e.g. ``kind: PermissionDecisionApproveOnceKind``) with * a class-level constant (e.g. ``kind: ClassVar[str] = "approve-once"``). * This lets users construct variants without supplying the discriminator * value (``PermissionDecisionApproveOnce()`` instead of * ``PermissionDecisionApproveOnce(kind=PermissionDecisionApproveOnceKind.APPROVE_ONCE)``), * matching the TS / Rust / .NET / Go ergonomics for the same schema. * * Also rewrites the generated ``from_dict`` to skip parsing the discriminator * (the dispatcher routed based on it; the variant class identity carries it) * and ``to_dict`` to emit the constant directly. */ function postProcessDiscriminatorDefaultsForPython( code: string, unions: ResolvedRefBasedUnion[] ): string { // Build variant lookup: variant class name → { prop, value }. const variantInfo = new Map(); for (const union of unions) { for (const d of union.dispatch) { // First-wins; multiple unions referencing the same variant share a // discriminator/value pair anyway. if (!variantInfo.has(d.typeName)) { variantInfo.set(d.typeName, { prop: union.discriminatorProp, value: d.value }); } } } if (variantInfo.size === 0) return code; const lines = code.split("\n"); const out: string[] = []; let usedClassVar = false; let i = 0; while (i < lines.length) { const line = lines[i]; const classMatch = line.match(/^class (\w+)[:\(]/); if (!classMatch) { out.push(line); i++; continue; } const className = classMatch[1]; const info = variantInfo.get(className); if (!info) { out.push(line); i++; continue; } // Find the bounds of this class block: everything indented under it. const classStart = i; let classEnd = i + 1; while (classEnd < lines.length) { const ln = lines[classEnd]; if ( /^class \w/.test(ln) || /^def \w/.test(ln) || ln === "@dataclass" || /^# (?:Experimental|Deprecated|Internal):/.test(ln) || ln.startsWith("@dataclass(") ) { break; } classEnd++; } const block = lines.slice(classStart, classEnd); // Locate the discriminator field declaration. Quicktype emits // ` kind: PermissionDecisionApproveOnceKind` while the // session-events codegen emits ` kind: str` — both match the // simple `: ` shape (no default value, since the // field is required in the schema). const fieldPattern = new RegExp(`^(\\s+)${info.prop}: [\\w\\[\\], ]+$`); let fieldIdx = -1; for (let j = 1; j < block.length; j++) { if (fieldPattern.test(block[j])) { fieldIdx = j; break; } } if (fieldIdx < 0) { // Variant class without an explicit discriminator field — leave alone. out.push(...block); i = classEnd; continue; } const fieldIndent = (block[fieldIdx].match(/^(\s+)/) ?? ["", ""])[1]; const literal = JSON.stringify(info.value); // Replace the field with a class-level constant. block[fieldIdx] = `${fieldIndent}${info.prop}: ClassVar[str] = ${literal}`; usedClassVar = true; // Drop any field-trailing docstring lines that immediately followed the // original field. Quicktype emits """..."""-style block strings; the // session-events codegen does not emit per-field docstrings. We only // touch the line at fieldIdx+1 if it's a docstring or blank. // (Conservative: leave additional lines in place; they don't reference // the dropped enum.) // Rewrite from_dict / to_dict bodies. for (let j = fieldIdx + 1; j < block.length; j++) { const ln = block[j]; // Drop ` = ...(obj.get(""))` parse line in from_dict. const propAssignPattern = new RegExp( `^\\s+${info.prop} = .+\\(obj\\.get\\(${JSON.stringify(info.prop)}\\)\\)` ); if (propAssignPattern.test(ln)) { block[j] = "<<>>"; continue; } // Drop multi-line constructor kwarg of the form ` kind=kind,` — // emitted by the session-events codegen when the constructor call // is broken across lines. const multilineKwargPattern = new RegExp( `^\\s+${info.prop}=${info.prop},?\\s*$` ); if (multilineKwargPattern.test(ln)) { block[j] = "<<>>"; continue; } // Convert `return X(a, prop, b)` (single-line positional) to drop // the prop arg. Quicktype-emitted constructors are single-line. const ctorMatch = ln.match(new RegExp(`^(\\s+)return ${className}\\((.*)\\)\\s*$`)); if (ctorMatch) { const argList = ctorMatch[2]; const args = splitTopLevelCommasMulti(argList); const filtered = args .map((a) => a.trim()) .filter((a) => { const kw = a.match(/^([a-zA-Z_]\w*)\s*=/); const name = kw ? kw[1] : a; return name !== info.prop; }); block[j] = `${ctorMatch[1]}return ${className}(${filtered.join(", ")})`; continue; } // Rewrite `result[""] = to_enum(, self.)` to // emit the class-level constant directly. const toDictPattern = new RegExp( `^(\\s+)result\\[${JSON.stringify(info.prop)}\\] = .+` ); if (toDictPattern.test(ln)) { const indent = (ln.match(/^(\s+)/) ?? ["", ""])[1]; block[j] = `${indent}result[${JSON.stringify(info.prop)}] = self.${info.prop}`; continue; } } out.push(...block.filter((l) => l !== "<<>>")); i = classEnd; } let result = out.join("\n"); if (usedClassVar) { result = ensureClassVarImport(result); } return result; } function splitTopLevelCommasMulti(s: string): string[] { const parts: string[] = []; let depth = 0; let start = 0; for (let i = 0; i < s.length; i++) { const c = s[i]; if (c === "(" || c === "[" || c === "{") depth++; else if (c === ")" || c === "]" || c === "}") depth--; else if (c === "," && depth === 0) { parts.push(s.slice(start, i)); start = i + 1; } } parts.push(s.slice(start)); return parts.filter((p) => p.trim().length > 0); } function ensureClassVarImport(code: string): string { // Already imported? if (/\bfrom typing import [^\n]*\bClassVar\b/.test(code)) return code; return code.replace( /^from typing import (.+)$/m, (_match, names) => { const list = names.split(",").map((n: string) => n.trim()).filter(Boolean); list.push("ClassVar"); list.sort(); return `from typing import ${[...new Set(list)].join(", ")}`; } ); } function pushPyExperimentalComment(lines: string[], subject: PyExperimentalSubject, indent = ""): void { lines.push(pyExperimentalComment(subject, indent)); } function pushPyExperimentalApiGroupComment(lines: string[]): void { lines.push("# Experimental: this API group is experimental and may change or be removed."); } /** * Emit `# Deprecated:` / `# Experimental:` / `# Internal:` comments above a * dataclass field. Order matches our other codegens (deprecated, experimental, * internal) and keeps the comments out of the field declaration itself. */ function pushPyFieldMarkers(lines: string[], propSchema: JSONSchema7 | null | undefined): void { if (!propSchema) return; if (isSchemaDeprecated(propSchema)) { lines.push(` # Deprecated: this field is deprecated.`); } if (isSchemaExperimental(propSchema)) { lines.push(` # Experimental: this field is part of an experimental API and may change or be removed.`); } if (isSchemaInternal(propSchema)) { lines.push(` # Internal: this field is an internal SDK API and is not part of the public surface.`); } } /** * Modernize quicktype's Python 3.7 output to Python 3.11+ syntax: * - Optional[T] → T | None * - List[T] → list[T] * - Dict[K, V] → dict[K, V] * - Type[T] → type[T] * - Callable from collections.abc instead of typing * - Clean up unused typing imports */ function replaceBalancedBrackets(code: string, prefix: string, replacer: (inner: string) => string): string { let result = ""; let i = 0; while (i < code.length) { const idx = code.indexOf(prefix + "[", i); if (idx === -1) { result += code.slice(i); break; } result += code.slice(i, idx); const start = idx + prefix.length + 1; // after '[' let depth = 1; let j = start; while (j < code.length && depth > 0) { if (code[j] === "[") depth++; else if (code[j] === "]") depth--; j++; } const inner = code.slice(start, j - 1); result += replacer(inner); i = j; } return result; } /** Split a string by commas, but only at the top bracket depth (ignores commas inside [...]) */ function splitTopLevelCommas(s: string): string[] { const parts: string[] = []; let depth = 0; let start = 0; for (let i = 0; i < s.length; i++) { if (s[i] === "[") depth++; else if (s[i] === "]") depth--; else if (s[i] === "," && depth === 0) { parts.push(s.slice(start, i)); start = i + 1; } } parts.push(s.slice(start)); return parts; } function pyDocstringLiteral(text: string): string { const normalized = text .split(/\r?\n/) .map((line) => line.replace(/\s+$/g, "")) .join("\n"); return JSON.stringify(normalized); } function rpcResultDescription(method: RpcMethod, resultSchema: JSONSchema7 | undefined): string | undefined { if (isVoidSchema(resultSchema)) return undefined; return method.result?.description ?? resultSchema?.description; } function rpcParamsDescription(method: RpcMethod, effectiveParams: JSONSchema7 | undefined): string | undefined { return method.params?.description ?? effectiveParams?.description; } function pushPyRpcMethodDocstring( lines: string[], indent: string, method: RpcMethod, options: { paramsName?: string; paramsDescription?: string; resultDescription?: string; deprecated?: boolean; experimental?: boolean; internal?: boolean; } = {} ): void { const sections: string[] = [method.description ?? `Calls ${method.rpcMethod}.`]; if (options.paramsName && options.paramsDescription) { sections.push(`Args:\n ${options.paramsName}: ${options.paramsDescription}`); } if (options.resultDescription) { sections.push(`Returns:\n ${options.resultDescription}`); } if (options.deprecated) { sections.push(".. deprecated:: This API is deprecated and will be removed in a future version."); } if (options.experimental) { sections.push(".. warning:: This API is experimental and may change or be removed in future versions."); } if (options.internal) { sections.push(":meta private:\n\nInternal SDK API; not part of the public surface."); } lines.push(`${indent}${pyDocstringLiteral(sections.join("\n\n"))}`); } function modernizePython(code: string): string { // Replace Optional[X] with X | None (handles arbitrarily nested brackets) code = replaceBalancedBrackets(code, "Optional", (inner) => `${inner} | None`); // Replace Union[X, Y] with X | Y (split only at top-level commas, not inside brackets) // Run iteratively to handle nested Union inside Dict/List let prev = ""; while (prev !== code) { prev = code; code = replaceBalancedBrackets(code, "Union", (inner) => { return splitTopLevelCommas(inner).map((s: string) => s.trim()).join(" | "); }); } // Replace List[X] with list[X] code = code.replace(/\bList\[/g, "list["); // Replace Dict[K, V] with dict[K, V] code = code.replace(/\bDict\[/g, "dict["); // Replace Type[T] with type[T] code = code.replace(/\bType\[/g, "type["); // Move Callable from typing to collections.abc code = code.replace( /from typing import (.*), Callable$/m, "from typing import $1\nfrom collections.abc import Callable" ); code = code.replace( /from typing import Callable, (.*)$/m, "from typing import $1\nfrom collections.abc import Callable" ); // Remove now-unused imports from typing (Optional, List, Dict, Type) code = code.replace(/from typing import (.+)$/m, (_match, imports: string) => { const items = imports.split(",").map((s: string) => s.trim()); const remove = new Set(["Optional", "List", "Dict", "Type", "Union"]); const kept = items.filter((i: string) => !remove.has(i)); return `from typing import ${kept.join(", ")}`; }); return code; } /** * Collapse lambdas that only forward their single argument into another callable. * This keeps the generated Python readable and avoids CodeQL "unnecessary lambda" findings. */ function unwrapRedundantPythonLambdas(code: string): string { return code.replace( /lambda\s+([A-Za-z_][A-Za-z0-9_]*)\s*:\s*((?:[A-Za-z_][A-Za-z0-9_]*)(?:\.[A-Za-z_][A-Za-z0-9_]*)*)\(\1\)/g, "$2" ); } function collapsePlaceholderPythonDataclasses(code: string, knownDefinitionNames?: Set): string { const classBlockRe = /(@dataclass\r?\nclass\s+(\w+):[\s\S]*?)(?=^@dataclass|^class\s+\w+|^def\s+\w+|\Z)/gm; const matches = [...code.matchAll(classBlockRe)].map((match) => ({ fullBlock: match[1], name: match[2], normalizedBody: normalizePythonDataclassBlock(match[1], match[2]), })); const groups = new Map(); for (const match of matches) { const group = groups.get(match.normalizedBody) ?? []; group.push(match); groups.set(match.normalizedBody, group); } for (const group of groups.values()) { if (group.length < 2) continue; const canonical = chooseCanonicalPlaceholderDuplicate(group.map(({ name }) => name), knownDefinitionNames); if (!canonical) continue; for (const duplicate of group) { if (duplicate.name === canonical) continue; // Only collapse types that quicktype invented (Class suffix or not // in the schema's named definitions). Preserve intentionally-named types. if (!isPlaceholderTypeName(duplicate.name) && knownDefinitionNames?.has(duplicate.name.toLowerCase())) continue; code = code.replace(duplicate.fullBlock, ""); code = code.replace(new RegExp(`\\b${duplicate.name}\\b`, "g"), canonical); } } return code.replace(/\n{3,}/g, "\n\n"); } /** * Reorder Python class/enum definitions so forward references are resolved. * Quicktype may emit classes in an order where a class references another * that hasn't been defined yet, causing NameError at import time. * This performs a topological sort of type definitions while preserving * the relative position of non-class blocks (functions, standalone code). */ function reorderPythonForwardRefs(code: string): string { // Split code into top-level blocks. Each block starts at an unindented // line that begins a class, decorated class, enum, or function definition. const lines = code.split("\n"); interface Block { name: string; code: string; isType: boolean; // true for class/enum definitions } const blocks: Block[] = []; let currentLines: string[] = []; let currentName: string | null = null; let isType = false; function flushBlock() { if (currentLines.length === 0) return; const blockCode = currentLines.join("\n"); blocks.push({ name: currentName ?? `__anon_${blocks.length}`, code: blockCode, isType, }); currentLines = []; currentName = null; isType = false; } for (let i = 0; i < lines.length; i++) { const line = lines[i]; const isTopLevel = line.length > 0 && line[0] !== " " && line[0] !== "\t"; if (isTopLevel) { const classMatch = line.match(/^class\s+(\w+)/); const defMatch = line.match(/^def\s+(\w+)/); const decoratorMatch = line === "@dataclass"; const commentMatch = line.startsWith("# "); if (classMatch) { // If previous block was just a decorator waiting for a class, merge if (currentLines.length > 0 && currentName === null && isType) { // This is the class line following @dataclass currentName = classMatch[1]; currentLines.push(line); continue; } flushBlock(); currentLines = [line]; currentName = classMatch[1]; isType = true; } else if (decoratorMatch) { flushBlock(); currentLines = [line]; isType = true; } else if (defMatch) { flushBlock(); currentLines = [line]; currentName = defMatch[1]; isType = false; } else if (commentMatch && currentLines.length === 0) { // Standalone comment — attach to next block currentLines = [line]; } else { currentLines.push(line); } } else { currentLines.push(line); } } flushBlock(); if (blocks.length === 0) return code; // Collect all type names (classes and enums) const typeNames = new Set(blocks.filter((b) => b.isType).map((b) => b.name)); if (typeNames.size === 0) return code; // Build dependency graph: for each type block, find references to other type names const deps = new Map>(); for (const block of blocks) { if (!block.isType) continue; const blockDeps = new Set(); for (const tn of typeNames) { if (tn === block.name) continue; if (new RegExp(`\\b${tn}\\b`).test(block.code)) { blockDeps.add(tn); } } deps.set(block.name, blockDeps); } // Kahn's algorithm for topological sort const inDegree = new Map(); for (const tn of typeNames) inDegree.set(tn, deps.get(tn)?.size ?? 0); const dependents = new Map(); for (const tn of typeNames) dependents.set(tn, []); for (const [name, d] of deps) { for (const dep of d) { dependents.get(dep)!.push(name); } } const queue: string[] = []; for (const [tn, deg] of inDegree) { if (deg === 0) queue.push(tn); } const sorted: string[] = []; while (queue.length > 0) { const node = queue.shift()!; sorted.push(node); for (const dep of dependents.get(node) ?? []) { const newDeg = inDegree.get(dep)! - 1; inDegree.set(dep, newDeg); if (newDeg === 0) queue.push(dep); } } // If there are cycles, keep remaining nodes in original order for (const block of blocks) { if (block.isType && !sorted.includes(block.name)) { sorted.push(block.name); } } // Rebuild: place type blocks in sorted order at the positions // where type blocks originally appeared const typeBlockMap = new Map(blocks.filter((b) => b.isType).map((b) => [b.name, b])); let sortIdx = 0; const result: string[] = []; for (const block of blocks) { if (block.isType) { result.push(typeBlockMap.get(sorted[sortIdx])!.code); sortIdx++; } else { result.push(block.code); } } return result.join("\n"); } function normalizePythonDataclassBlock(block: string, name: string): string { return block .replace(/^@dataclass\r?\nclass\s+\w+:/, "@dataclass\nclass:") .replace(new RegExp(`\\b${name}\\b`, "g"), "SelfType") .split(/\r?\n/) .map((line) => line.trim()) .filter((line) => line.length > 0) .join("\n"); } function chooseCanonicalPlaceholderDuplicate(names: string[], knownDefinitionNames?: Set): string | undefined { // Prefer the name that matches a schema definition — it's intentionally named. if (knownDefinitionNames) { const definedName = names.find((name) => knownDefinitionNames.has(name.toLowerCase())); if (definedName) return definedName; } // Fallback for Class-suffix placeholders: pick the non-placeholder name. const specificNames = names.filter((name) => !isPlaceholderTypeName(name)); if (specificNames.length === 0) return undefined; return specificNames[0]; } function isPlaceholderTypeName(name: string): boolean { return name.endsWith("Class") || name.endsWith("Enum"); } function toSnakeCase(s: string): string { return s .replace(/([a-z])([A-Z])/g, "$1_$2") .replace(/[._]/g, "_") .toLowerCase(); } function stripDurationMillisecondsSuffix(name: string): string { if (name.length > 2 && name.endsWith("Ms") && /[a-z]/.test(name.charAt(name.length - 3))) { return name.slice(0, -2); } return name; } function isSecondsDurationPropertyName(propName: string | undefined): boolean { return propName !== undefined && /seconds$/i.test(propName); } function isPyDurationProperty(propSchema: JSONSchema7, ctx: PyCodegenCtx): boolean { if (propSchema.$ref && typeof propSchema.$ref === "string") { const resolved = resolveSchema(propSchema, ctx.definitions); if (resolved && resolved !== propSchema) { return isPyDurationProperty(resolved, ctx); } } if (propSchema.allOf && propSchema.allOf.length === 1 && typeof propSchema.allOf[0] === "object") { return isPyDurationProperty(propSchema.allOf[0] as JSONSchema7, ctx); } if (propSchema.anyOf) { const variants = (propSchema.anyOf as JSONSchema7[]) .filter((item) => typeof item === "object") .map( (item) => resolveSchema(item as JSONSchema7, ctx.definitions) ?? (item as JSONSchema7) ); const nonNull = variants.filter((item) => !isPyNullLikeSchema(item)); return nonNull.length === 1 && isPyDurationProperty(nonNull[0], ctx); } if (propSchema.format !== "duration") { return false; } const type = propSchema.type; if (type === "number" || type === "integer") { return true; } if (Array.isArray(type)) { const nonNullTypes = type.filter((value) => value !== "null"); return nonNullTypes.length === 1 && (nonNullTypes[0] === "number" || nonNullTypes[0] === "integer"); } return false; } function toPyFieldName(propName: string, propSchema: JSONSchema7, ctx: PyCodegenCtx): string { return toSnakeCase(isPyDurationProperty(propSchema, ctx) ? stripDurationMillisecondsSuffix(propName) : propName); } function escapeRegExp(value: string): string { return value.replace(/[.*+?^${}()|[\]\\]/g, "\\$&"); } function removeRequiredAnyDefaultsForPython( code: string, definitions: Record, definitionCollections: DefinitionCollections ): string { const requiredFieldsByClass = new Map>(); for (const [definitionName, schema] of Object.entries(definitions)) { const resolved = resolveObjectSchema(schema, definitionCollections) ?? resolveSchema(schema, definitionCollections); if (!resolved || !isObjectSchema(resolved) || !resolved.properties || !Array.isArray(resolved.required)) { continue; } const requiredFields = resolved.required.map(toSnakeCase); for (const className of new Set([definitionName, toPascalCase(definitionName)])) { const fields = requiredFieldsByClass.get(className) ?? new Set(); for (const field of requiredFields) { fields.add(field); } requiredFieldsByClass.set(className, fields); } } const classBlockRe = /(@dataclass\r?\nclass\s+(\w+):[\s\S]*?)(?=^@dataclass|^class\s+\w|^def\s+\w|\Z)/gm; return code.replace(classBlockRe, (block: string, _classPrefix: string, className: string) => { const requiredFields = requiredFieldsByClass.get(className); if (!requiredFields) { return block; } let updatedBlock = block; for (const field of requiredFields) { updatedBlock = updatedBlock.replace(new RegExp(`^( ${escapeRegExp(field)}: Any) = None$`, "m"), "$1"); } return updatedBlock; }); } function toPascalCase(s: string): string { return fixBrandCasing( s .split(/[._]/) .map((w) => w.charAt(0).toUpperCase() + w.slice(1)) .join("") ); } function collectRpcMethods(node: Record): RpcMethod[] { const results: RpcMethod[] = []; for (const value of Object.values(node)) { if (isRpcMethod(value)) { results.push(value); } else if (typeof value === "object" && value !== null) { results.push(...collectRpcMethods(value as Record)); } } return results; } let rpcDefinitions: DefinitionCollections = { definitions: {}, $defs: {} }; function withRootTitle(schema: JSONSchema7, title: string): JSONSchema7 { return { ...schema, title }; } function pythonRequestFallbackName(method: RpcMethod): string { return toPascalCase(method.rpcMethod) + "Request"; } function schemaSourceForNamedDefinition( schema: JSONSchema7 | null | undefined, resolvedSchema: JSONSchema7 | undefined ): JSONSchema7 { if (schema?.$ref && resolvedSchema) { return resolvedSchema; } // When the schema is an anyOf/oneOf wrapper (e.g., Zod optional params producing // `anyOf: [{ not: {} }, { $ref }]`), use the resolved object schema to avoid // generating self-referential type aliases that crash quicktype. if ((schema?.anyOf || schema?.oneOf) && resolvedSchema?.properties) { return resolvedSchema; } return schema ?? resolvedSchema ?? { type: "object" }; } function isNamedPyObjectSchema(schema: JSONSchema7 | undefined): schema is JSONSchema7 { return !!schema && schema.type === "object" && (schema.properties !== undefined || schema.additionalProperties === false); } function getMethodResultSchema(method: RpcMethod): JSONSchema7 | undefined { return resolveSchema(method.result, rpcDefinitions) ?? method.result ?? undefined; } function isPythonObjectResultSchema(schema: JSONSchema7 | undefined): boolean { if (!schema) return false; if (isObjectSchema(schema)) return true; const variants = schema.anyOf ?? schema.oneOf; if (!Array.isArray(variants)) return false; const nonNullVariants = variants .filter((variant): variant is JSONSchema7 => typeof variant === "object" && variant !== null) .map((variant) => resolveObjectSchema(variant, rpcDefinitions) ?? resolveSchema(variant, rpcDefinitions) ?? variant) .filter( (variant) => variant.type !== "null" && !( typeof variant.not === "object" && variant.not !== null && Object.keys(variant.not).length === 0 ) ); if (nonNullVariants.length === 1) { return isPythonObjectResultSchema(nonNullVariants[0]); } return nonNullVariants.length > 1 && findPyDiscriminator(nonNullVariants) !== null; } function getMethodParamsSchema(method: RpcMethod): JSONSchema7 | undefined { return ( resolveObjectSchema(method.params, rpcDefinitions) ?? resolveSchema(method.params, rpcDefinitions) ?? method.params ?? undefined ); } function pythonResultTypeName(method: RpcMethod, schemaOverride?: JSONSchema7): string { const schema = schemaOverride ?? getMethodResultSchema(method); // If schema is a $ref, derive the type name from the ref path if (schema?.$ref) { const refName = schema.$ref.split("/").pop(); if (refName) return toPascalCase(refName); } return getRpcSchemaTypeName(schema, toPascalCase(method.rpcMethod) + "Result"); } /** Detect the Zod optional params pattern: `anyOf: [{ not: {} }, { $ref }]` */ function isParamsOptional(method: RpcMethod): boolean { const schema = method.params; if (!schema?.anyOf) return false; return schema.anyOf.some( (item) => typeof item === "object" && (item as JSONSchema7).not !== undefined && typeof (item as JSONSchema7).not === "object" && Object.keys((item as JSONSchema7).not as object).length === 0 ); } function pythonParamsTypeName(method: RpcMethod): string { const fallback = pythonRequestFallbackName(method); if (method.rpcMethod.startsWith("session.") && method.params?.$ref) { return fallback; } const schema = getMethodParamsSchema(method); if (schema?.$ref) return toPascalCase(refTypeName(schema.$ref, rpcDefinitions)); return getRpcSchemaTypeName(schema, fallback); } // ── Session Events ────────────────────────────────────────────────────────── // ── Session Events (custom codegen — dedicated per-event payload types) ───── interface PyEventVariant { typeName: string; dataClassName: string; dataSchema: JSONSchema7; dataDescription?: string; eventExperimental: boolean; dataExperimental: boolean; } interface PyEventEnvelopeProperty extends SessionEventEnvelopeProperty { jsonName: string; fieldName: string; hasDefault: boolean; resolved: PyResolvedType; } interface PyResolvedType { annotation: string; fromExpr: (expr: string) => string; toExpr: (expr: string) => string; } interface PyCodegenCtx { classes: string[]; aliases: string[]; aliasesByName: Set; enums: string[]; enumsByName: Map; generatedNames: Set; usesTimedelta: boolean; usesIntegerTimedelta: boolean; definitions: DefinitionCollections; refBasedUnions: ResolvedRefBasedUnion[]; } function toEnumMemberName(value: string): string { const cleaned = value .replace(/([a-z])([A-Z])/g, "$1_$2") .replace(/[^A-Za-z0-9]+/g, "_") .replace(/^_+|_+$/g, "") .toUpperCase(); if (!cleaned) { return "VALUE"; } return /^[0-9]/.test(cleaned) ? `VALUE_${cleaned}` : cleaned; } function wrapParser(resolved: PyResolvedType, arg = "x"): string { return `lambda ${arg}: ${resolved.fromExpr(arg)}`; } function wrapSerializer(resolved: PyResolvedType, arg = "x"): string { return `lambda ${arg}: ${resolved.toExpr(arg)}`; } const PY_SESSION_EVENT_TYPE_RENAMES: Record = { AssistantMessageDataToolRequestsItem: "AssistantMessageToolRequest", AssistantMessageDataToolRequestsItemType: "AssistantMessageToolRequestType", AssistantUsageDataCopilotUsage: "AssistantUsageCopilotUsage", AssistantUsageDataCopilotUsageTokenDetailsItem: "AssistantUsageCopilotUsageTokenDetail", AssistantUsageDataQuotaSnapshotsValue: "AssistantUsageQuotaSnapshot", CapabilitiesChangedDataUi: "CapabilitiesChangedUI", CommandsChangedDataCommandsItem: "CommandsChangedCommand", ElicitationCompletedDataAction: "ElicitationCompletedAction", ElicitationRequestedDataMode: "ElicitationRequestedMode", ElicitationRequestedDataRequestedSchema: "ElicitationRequestedSchema", McpOauthRequiredDataStaticClientConfig: "MCPOauthRequiredStaticClientConfig", PermissionCompletedDataResultKind: "PermissionCompletedKind", PermissionRequestedDataPermissionRequest: "PermissionRequest", PermissionRequestedDataPermissionRequestAction: "PermissionRequestMemoryAction", PermissionRequestedDataPermissionRequestCommandsItem: "PermissionRequestShellCommand", PermissionRequestedDataPermissionRequestDirection: "PermissionRequestMemoryDirection", PermissionRequestedDataPermissionRequestPossibleUrlsItem: "PermissionRequestShellPossibleURL", SessionCompactionCompleteDataCompactionTokensUsed: "CompactionCompleteCompactionTokensUsed", SessionCustomAgentsUpdatedDataAgentsItem: "CustomAgentsUpdatedAgent", SessionExtensionsLoadedDataExtensionsItem: "ExtensionsLoadedExtension", SessionExtensionsLoadedDataExtensionsItemSource: "ExtensionsLoadedExtensionSource", SessionExtensionsLoadedDataExtensionsItemStatus: "ExtensionsLoadedExtensionStatus", SessionHandoffDataRepository: "HandoffRepository", SessionHandoffDataSourceType: "HandoffSourceType", SessionMcpServersLoadedDataServersItem: "MCPServersLoadedServer", SessionMcpServersLoadedDataServersItemStatus: "MCPServerStatus", SessionShutdownDataCodeChanges: "ShutdownCodeChanges", SessionShutdownDataModelMetricsValue: "ShutdownModelMetric", SessionShutdownDataModelMetricsValueRequests: "ShutdownModelMetricRequests", SessionShutdownDataModelMetricsValueUsage: "ShutdownModelMetricUsage", SessionShutdownDataShutdownType: "ShutdownType", SessionSkillsLoadedDataSkillsItem: "SkillsLoadedSkill", UserMessageDataAgentMode: "UserMessageAgentMode", }; function postProcessPythonSessionEventCode(code: string): string { for (const [from, to] of Object.entries(PY_SESSION_EVENT_TYPE_RENAMES).sort( ([left], [right]) => right.length - left.length )) { code = code.replace(new RegExp(`\\b${from}\\b`, "g"), to); } return unwrapRedundantPythonLambdas(code); } function pyPrimitiveResolvedType(annotation: string, fromFn: string, toFn = fromFn): PyResolvedType { return { annotation, fromExpr: (expr) => `${fromFn}(${expr})`, toExpr: (expr) => `${toFn}(${expr})`, }; } function pyOptionalResolvedType(inner: PyResolvedType): PyResolvedType { return { annotation: `${inner.annotation} | None`, fromExpr: (expr) => `from_union([from_none, ${wrapParser(inner)}], ${expr})`, toExpr: (expr) => `from_union([from_none, ${wrapSerializer(inner)}], ${expr})`, }; } function pyAnyResolvedType(): PyResolvedType { return { annotation: "Any", fromExpr: (expr) => expr, toExpr: (expr) => expr, }; } function pyDurationResolvedType(ctx: PyCodegenCtx, isInteger: boolean): PyResolvedType { ctx.usesTimedelta = true; if (isInteger) { ctx.usesIntegerTimedelta = true; } return { annotation: "timedelta", fromExpr: (expr) => `from_timedelta(${expr})`, toExpr: (expr) => (isInteger ? `to_timedelta_int(${expr})` : `to_timedelta(${expr})`), }; } /** * Emit a "$ref-based discriminated union" — a Python equivalent of the * polymorphic hierarchies that TS / Rust / .NET / Go produce for the same * schema shape. Given a definition like * * "PermissionRequest": { "anyOf": [ {"$ref": "#/.../PermissionRequestShell"}, ... ] } * * where every variant is a `$ref` to a sibling definition and the variants * share a `const` discriminator property (e.g. `kind`), emit each variant as a * standalone `@dataclass`, plus a union alias and a `from_dict` dispatcher. * * Returns the resolved type or `undefined` if the schema doesn't match the * expected shape (caller falls back to other paths). */ function tryEmitPyRefBasedDiscriminatedUnion( aliasName: string, resolved: JSONSchema7, ctx: PyCodegenCtx ): PyResolvedType | undefined { const variants = (resolved.anyOf ?? resolved.oneOf) as JSONSchema7[] | undefined; if (!Array.isArray(variants) || variants.length < 2) return undefined; const variantRefNames: string[] = []; for (const v of variants) { if (!v || typeof v !== "object") return undefined; const ref = (v as JSONSchema7).$ref; if (typeof ref !== "string" || !ref.startsWith("#/definitions/")) { return undefined; } variantRefNames.push(refTypeName(ref, ctx.definitions)); } const resolvedVariants = variants.map( (v) => resolveObjectSchema(v, ctx.definitions) ?? resolveSchema(v, ctx.definitions) ?? (v as JSONSchema7) ); if (resolvedVariants.some((rv) => !rv || rv.properties === undefined)) { return undefined; } const discriminator = findPyDiscriminator(resolvedVariants as JSONSchema7[]); if (!discriminator) return undefined; const variantTypeNames: string[] = []; const dispatch: Array<{ value: string; typeName: string }> = []; for (let i = 0; i < variants.length; i++) { const variantTypeName = toPascalCase(variantRefNames[i]); const variantSchema = resolveObjectSchema(variants[i], ctx.definitions); if (variantSchema) { emitPyClass(variantTypeName, variantSchema, ctx, variantSchema.description); } variantTypeNames.push(variantTypeName); const discProp = resolvedVariants[i].properties?.[discriminator.property] as JSONSchema7; dispatch.push({ value: String(discProp.const), typeName: variantTypeName }); } if (!ctx.aliasesByName.has(aliasName)) { const lines: string[] = []; if (resolved.description) { lines.push(`# ${resolved.description}`); } lines.push(`${aliasName} = ${variantTypeNames.join(" | ")}`); ctx.aliasesByName.add(aliasName); ctx.aliases.push(lines.join("\n")); ctx.refBasedUnions.push({ aliasName, discriminatorProp: discriminator.property, dispatch, }); } const dispatcherName = `_load_${aliasName}`; if (!ctx.generatedNames.has(dispatcherName)) { ctx.generatedNames.add(dispatcherName); const lines: string[] = []; lines.push(`def ${dispatcherName}(obj: Any) -> "${aliasName}":`); lines.push(` assert isinstance(obj, dict)`); lines.push(` kind = obj.get(${JSON.stringify(discriminator.property)})`); lines.push(` match kind:`); for (const m of dispatch) { lines.push( ` case ${JSON.stringify(m.value)}: return ${m.typeName}.from_dict(obj)` ); } lines.push( ` case _: raise ValueError(f"Unknown ${aliasName} ${discriminator.property}: {kind!r}")` ); ctx.classes.push(lines.join("\n")); } return { annotation: aliasName, fromExpr: (expr) => `${dispatcherName}(${expr})`, toExpr: (expr) => `${expr}.to_dict()`, }; } function isPyBase64StringSchema(schema: JSONSchema7): boolean { return schema.format === "byte" || (schema as Record).contentEncoding === "base64"; } function extractPyEventVariants(schema: JSONSchema7): PyEventVariant[] { const definitionCollections = collectDefinitionCollections(schema as Record); return getSessionEventVariantSchemas(schema, definitionCollections) .map((variant) => { const typeSchema = variant.properties!.type as JSONSchema7; const typeName = typeSchema?.const as string; if (!typeName) { throw new Error("Event variant must define type.const"); } const dataSchema = resolveObjectSchema(variant.properties!.data as JSONSchema7, definitionCollections) ?? resolveSchema(variant.properties!.data as JSONSchema7, definitionCollections) ?? ((variant.properties!.data as JSONSchema7) || {}); return { typeName, dataClassName: `${toPascalCase(typeName)}Data`, dataSchema, dataDescription: dataSchema.description, eventExperimental: isSchemaExperimental(variant), dataExperimental: isSchemaExperimental(dataSchema), }; }); } function getPySharedEventEnvelopeProperties(schema: JSONSchema7, ctx: PyCodegenCtx): PyEventEnvelopeProperty[] { return getSharedSessionEventEnvelopeProperties(schema, ctx.definitions) .map((property) => { const { name, schema, required } = property; const resolved = resolvePyPropertyType(schema, "SessionEvent", name, required, ctx); return { ...property, jsonName: name, fieldName: toPyFieldName(name, schema, ctx), required, hasDefault: !required || resolved.annotation.includes(" | None"), resolved, }; }); } function findPyDiscriminator( variants: JSONSchema7[] ): { property: string; mapping: Map } | null { if (variants.length === 0) { return null; } const firstVariant = variants[0]; if (!firstVariant.properties) { return null; } for (const [propName, propSchema] of Object.entries(firstVariant.properties)) { if (typeof propSchema !== "object") { continue; } if ((propSchema as JSONSchema7).const === undefined) { continue; } const mapping = new Map(); let valid = true; for (const variant of variants) { if (!variant.properties) { valid = false; break; } const variantProp = variant.properties[propName]; if (typeof variantProp !== "object" || (variantProp as JSONSchema7).const === undefined) { valid = false; break; } mapping.set(String((variantProp as JSONSchema7).const), variant); } if (valid && mapping.size === variants.length) { return { property: propName, mapping }; } } return null; } function isPyNullLikeSchema(schema: JSONSchema7): boolean { return schema.type === "null" || (typeof schema.not === "object" && schema.not !== null && Object.keys(schema.not).length === 0); } function getPyNamedSchemaType( schema: JSONSchema7, ctx: PyCodegenCtx ): { typeName: string; resolved: PyResolvedType } | undefined { const resolved = resolveSchema(schema, ctx.definitions) ?? schema; const typeName = schema.$ref ? toPascalCase(refTypeName(schema.$ref, ctx.definitions)) : typeof resolved.title === "string" ? resolved.title : undefined; if (!typeName) { return undefined; } if (resolved.enum && Array.isArray(resolved.enum) && resolved.enum.every((value) => typeof value === "string")) { const enumType = getOrCreatePyEnum( typeName, resolved.enum as string[], ctx, resolved.description, getEnumValueDescriptions(resolved), isSchemaDeprecated(resolved), isSchemaExperimental(resolved) ); return { typeName: enumType, resolved: { annotation: enumType, fromExpr: (expr) => `parse_enum(${enumType}, ${expr})`, toExpr: (expr) => `to_enum(${enumType}, ${expr})`, }, }; } const resolvedObject = resolveObjectSchema(schema, ctx.definitions) ?? resolveObjectSchema(resolved, ctx.definitions); if (isNamedPyObjectSchema(resolvedObject)) { emitPyClass(typeName, resolvedObject, ctx, resolvedObject.description); return { typeName, resolved: { annotation: typeName, fromExpr: (expr) => `${typeName}.from_dict(${expr})`, toExpr: (expr) => `to_class(${typeName}, ${expr})`, }, }; } return undefined; } function getOrCreatePyUnionAlias( aliasName: string, members: string[], ctx: PyCodegenCtx, description?: string ): string { if (!ctx.aliasesByName.has(aliasName)) { const lines: string[] = []; if (description) { lines.push(`# ${description}`); } lines.push(`${aliasName} = ${members.join(" | ")}`); ctx.aliasesByName.add(aliasName); ctx.aliases.push(lines.join("\n")); } return aliasName; } function resolvePyNamedUnion( typeName: string, schemas: JSONSchema7[], ctx: PyCodegenCtx, description?: string ): PyResolvedType | undefined { const members = schemas .filter((schema) => !isPyNullLikeSchema(schema)) .map((schema) => getPyNamedSchemaType(schema, ctx)); if (members.length === 0 || members.some((member) => member === undefined)) { return undefined; } const namedMembers = members as Array<{ typeName: string; resolved: PyResolvedType }>; const aliasName = getOrCreatePyUnionAlias( typeName, namedMembers.map((member) => member.typeName), ctx, description ); return { annotation: aliasName, fromExpr: (expr) => `from_union([${namedMembers.map((member) => member.resolved.fromExpr).map((fromExpr) => fromExpr("x")).map((expr) => `lambda x: ${expr}`).join(", ")}], ${expr})`, toExpr: (expr) => `from_union([${namedMembers.map((member) => member.resolved.toExpr).map((toExpr) => toExpr("x")).map((expr) => `lambda x: ${expr}`).join(", ")}], ${expr})`, }; } function getOrCreatePyEnum( enumName: string, values: string[], ctx: PyCodegenCtx, description?: string, enumValueDescriptions?: EnumValueDescriptions, deprecated?: boolean, experimental?: boolean ): string { const existing = ctx.enumsByName.get(enumName); if (existing) { return existing; } const lines: string[] = []; if (experimental) { pushPyExperimentalComment(lines, "enum"); } if (deprecated) { lines.push(`# Deprecated: this enum is deprecated and will be removed in a future version.`); } if (description) { lines.push(`class ${enumName}(Enum):`); lines.push(` ${pyDocstringLiteral(description)}`); } else { lines.push(`class ${enumName}(Enum):`); } for (const value of values) { const valueDescription = enumValueDescriptions?.[value]; if (valueDescription) { for (const line of valueDescription.split(/\r?\n/)) { lines.push(` # ${line.trimEnd()}`); } } lines.push(` ${toEnumMemberName(value)} = ${JSON.stringify(value)}`); } ctx.enumsByName.set(enumName, enumName); ctx.enums.push(lines.join("\n")); return enumName; } function resolvePyPropertyType( propSchema: JSONSchema7, parentTypeName: string, jsonPropName: string, isRequired: boolean, ctx: PyCodegenCtx ): PyResolvedType { const fallbackName = parentTypeName + toPascalCase(jsonPropName); const nestedName = typeof propSchema.title === "string" ? propSchema.title : fallbackName; if (propSchema.$ref && typeof propSchema.$ref === "string") { const typeName = toPascalCase(refTypeName(propSchema.$ref, ctx.definitions)); const resolved = resolveSchema(propSchema, ctx.definitions); if (resolved && resolved !== propSchema) { if (resolved.enum && Array.isArray(resolved.enum) && resolved.enum.every((value) => typeof value === "string")) { const enumType = getOrCreatePyEnum(typeName, resolved.enum as string[], ctx, resolved.description, getEnumValueDescriptions(resolved), isSchemaDeprecated(resolved), isSchemaExperimental(resolved)); const enumResolved: PyResolvedType = { annotation: enumType, fromExpr: (expr) => `parse_enum(${enumType}, ${expr})`, toExpr: (expr) => `to_enum(${enumType}, ${expr})`, }; return isRequired ? enumResolved : pyOptionalResolvedType(enumResolved); } // Emit "$ref"-based discriminated unions as proper Python unions // (per-variant dataclasses + alias + dispatcher) rather than flat // merged dataclasses. Matches the polymorphic hierarchies emitted // by the TS / Rust / .NET / Go SDKs for the same schema shape. if (resolved.anyOf || resolved.oneOf) { const unionResolved = tryEmitPyRefBasedDiscriminatedUnion(typeName, resolved, ctx); if (unionResolved) { return isRequired ? unionResolved : pyOptionalResolvedType(unionResolved); } } const resolvedObject = resolveObjectSchema(propSchema, ctx.definitions); if (isNamedPyObjectSchema(resolvedObject)) { emitPyClass(typeName, resolvedObject, ctx, resolvedObject.description); const objectResolved: PyResolvedType = { annotation: typeName, fromExpr: (expr) => `${typeName}.from_dict(${expr})`, toExpr: (expr) => `to_class(${typeName}, ${expr})`, }; return isRequired ? objectResolved : pyOptionalResolvedType(objectResolved); } return resolvePyPropertyType(resolved, parentTypeName, jsonPropName, isRequired, ctx); } } if (propSchema.allOf && propSchema.allOf.length === 1 && typeof propSchema.allOf[0] === "object") { return resolvePyPropertyType( propSchema.allOf[0] as JSONSchema7, parentTypeName, jsonPropName, isRequired, ctx ); } if (propSchema.anyOf) { const variantSchemas = (propSchema.anyOf as JSONSchema7[]) .filter((item) => typeof item === "object") .map((item) => item as JSONSchema7); const variants = variantSchemas .map( (item) => resolveObjectSchema(item, ctx.definitions) ?? resolveSchema(item, ctx.definitions) ?? item ); const nonNull = variants.filter((item) => !isPyNullLikeSchema(item)); const hasNull = variants.length !== nonNull.length; if (nonNull.length === 1) { const inner = resolvePyPropertyType(nonNull[0], parentTypeName, jsonPropName, true, ctx); return hasNull || !isRequired ? pyOptionalResolvedType(inner) : inner; } if (nonNull.length > 1) { const discriminator = findPyDiscriminator(nonNull); if (discriminator) { // Prefer the proper per-variant union shape when every variant // is a `$ref` to a sibling definition. Same rationale as in the // top-level $ref branch above: matches TS/Rust/.NET/Go. if (variantSchemas.every((s) => typeof s.$ref === "string")) { const unionResolved = tryEmitPyRefBasedDiscriminatedUnion( nestedName, propSchema, ctx ); if (unionResolved) { return hasNull || !isRequired ? pyOptionalResolvedType(unionResolved) : unionResolved; } } emitPyFlatDiscriminatedUnion( nestedName, discriminator.property, discriminator.mapping, ctx, propSchema.description, isSchemaExperimental(propSchema) ); const resolved: PyResolvedType = { annotation: nestedName, fromExpr: (expr) => `${nestedName}.from_dict(${expr})`, toExpr: (expr) => `to_class(${nestedName}, ${expr})`, }; return hasNull || !isRequired ? pyOptionalResolvedType(resolved) : resolved; } const namedUnion = resolvePyNamedUnion( nestedName, variantSchemas.filter((schema) => !isPyNullLikeSchema(resolveSchema(schema, ctx.definitions) ?? schema)), ctx, propSchema.description ); if (namedUnion) { return hasNull || !isRequired ? pyOptionalResolvedType(namedUnion) : namedUnion; } return pyAnyResolvedType(); } } if (propSchema.enum && Array.isArray(propSchema.enum) && propSchema.enum.every((value) => typeof value === "string")) { const enumType = getOrCreatePyEnum( nestedName, propSchema.enum as string[], ctx, propSchema.description, getEnumValueDescriptions(propSchema), isSchemaDeprecated(propSchema), isSchemaExperimental(propSchema) ); const resolved: PyResolvedType = { annotation: enumType, fromExpr: (expr) => `parse_enum(${enumType}, ${expr})`, toExpr: (expr) => `to_enum(${enumType}, ${expr})`, }; return isRequired ? resolved : pyOptionalResolvedType(resolved); } if (propSchema.const !== undefined) { if (typeof propSchema.const === "string") { const resolved = pyPrimitiveResolvedType("str", "from_str"); return isRequired ? resolved : pyOptionalResolvedType(resolved); } if (typeof propSchema.const === "boolean") { const resolved = pyPrimitiveResolvedType("bool", "from_bool"); return isRequired ? resolved : pyOptionalResolvedType(resolved); } if (typeof propSchema.const === "number") { const resolved = Number.isInteger(propSchema.const) ? pyPrimitiveResolvedType("int", "from_int", "to_int") : pyPrimitiveResolvedType("float", "from_float", "to_float"); return isRequired ? resolved : pyOptionalResolvedType(resolved); } } const type = propSchema.type; const format = propSchema.format; if (Array.isArray(type)) { const nonNullTypes = type.filter((value) => value !== "null"); if (nonNullTypes.length === 1) { const inner = resolvePyPropertyType( { ...propSchema, type: nonNullTypes[0] as JSONSchema7["type"] }, parentTypeName, jsonPropName, true, ctx ); return pyOptionalResolvedType(inner); } } if (type === "string") { if (format === "date-time") { const resolved = pyPrimitiveResolvedType("datetime", "from_datetime", "to_datetime"); return isRequired ? resolved : pyOptionalResolvedType(resolved); } if (format === "uuid") { const resolved = pyPrimitiveResolvedType("UUID", "from_uuid", "to_uuid"); return isRequired ? resolved : pyOptionalResolvedType(resolved); } if (format === "uri" || format === "regex" || isPyBase64StringSchema(propSchema)) { const resolved = pyPrimitiveResolvedType("str", "from_str"); return isRequired ? resolved : pyOptionalResolvedType(resolved); } const resolved = pyPrimitiveResolvedType("str", "from_str"); return isRequired ? resolved : pyOptionalResolvedType(resolved); } if (type === "integer") { if (format === "duration" && !isSecondsDurationPropertyName(jsonPropName)) { const resolved = pyDurationResolvedType(ctx, true); return isRequired ? resolved : pyOptionalResolvedType(resolved); } const resolved = pyPrimitiveResolvedType("int", "from_int", "to_int"); return isRequired ? resolved : pyOptionalResolvedType(resolved); } if (type === "number") { if (format === "duration" && !isSecondsDurationPropertyName(jsonPropName)) { const resolved = pyDurationResolvedType(ctx, false); return isRequired ? resolved : pyOptionalResolvedType(resolved); } const resolved = pyPrimitiveResolvedType("float", "from_float", "to_float"); return isRequired ? resolved : pyOptionalResolvedType(resolved); } if (type === "boolean") { const resolved = pyPrimitiveResolvedType("bool", "from_bool"); return isRequired ? resolved : pyOptionalResolvedType(resolved); } if (type === "array") { const items = propSchema.items as JSONSchema7 | undefined; if (!items) { const resolved: PyResolvedType = { annotation: "list[Any]", fromExpr: (expr) => `from_list(lambda x: x, ${expr})`, toExpr: (expr) => `from_list(lambda x: x, ${expr})`, }; return isRequired ? resolved : pyOptionalResolvedType(resolved); } if (items.allOf && items.allOf.length === 1 && typeof items.allOf[0] === "object") { return resolvePyPropertyType( { ...propSchema, items: items.allOf[0] as JSONSchema7 }, parentTypeName, jsonPropName, isRequired, ctx ); } if (items.anyOf) { const itemVariants = (items.anyOf as JSONSchema7[]) .filter((variant) => typeof variant === "object") .map( (variant) => resolveObjectSchema(variant as JSONSchema7, ctx.definitions) ?? resolveSchema(variant as JSONSchema7, ctx.definitions) ?? (variant as JSONSchema7) ) .filter((variant) => variant.type !== "null"); const discriminator = findPyDiscriminator(itemVariants); if (discriminator) { const itemTypeName = nestedName + "Item"; emitPyFlatDiscriminatedUnion( itemTypeName, discriminator.property, discriminator.mapping, ctx, items.description, isSchemaExperimental(items) ); const resolved: PyResolvedType = { annotation: `list[${itemTypeName}]`, fromExpr: (expr) => `from_list(${itemTypeName}.from_dict, ${expr})`, toExpr: (expr) => `from_list(lambda x: to_class(${itemTypeName}, x), ${expr})`, }; return isRequired ? resolved : pyOptionalResolvedType(resolved); } } const itemType = resolvePyPropertyType(items, parentTypeName, jsonPropName + "Item", true, ctx); const resolved: PyResolvedType = { annotation: `list[${itemType.annotation}]`, fromExpr: (expr) => `from_list(${wrapParser(itemType)}, ${expr})`, toExpr: (expr) => `from_list(${wrapSerializer(itemType)}, ${expr})`, }; return isRequired ? resolved : pyOptionalResolvedType(resolved); } if (type === "object" || (propSchema.properties && !type)) { if (propSchema.properties) { emitPyClass(nestedName, propSchema, ctx, propSchema.description); const resolved: PyResolvedType = { annotation: nestedName, fromExpr: (expr) => `${nestedName}.from_dict(${expr})`, toExpr: (expr) => `to_class(${nestedName}, ${expr})`, }; return isRequired ? resolved : pyOptionalResolvedType(resolved); } if (propSchema.additionalProperties) { if ( typeof propSchema.additionalProperties === "object" && Object.keys(propSchema.additionalProperties as Record).length > 0 ) { const valueType = resolvePyPropertyType( propSchema.additionalProperties as JSONSchema7, parentTypeName, jsonPropName + "Value", true, ctx ); const resolved: PyResolvedType = { annotation: `dict[str, ${valueType.annotation}]`, fromExpr: (expr) => `from_dict(${wrapParser(valueType)}, ${expr})`, toExpr: (expr) => `from_dict(${wrapSerializer(valueType)}, ${expr})`, }; return isRequired ? resolved : pyOptionalResolvedType(resolved); } const resolved: PyResolvedType = { annotation: "dict[str, Any]", fromExpr: (expr) => `from_dict(lambda x: x, ${expr})`, toExpr: (expr) => `from_dict(lambda x: x, ${expr})`, }; return isRequired ? resolved : pyOptionalResolvedType(resolved); } return pyAnyResolvedType(); } return pyAnyResolvedType(); } function emitPyClass( typeName: string, schema: JSONSchema7, ctx: PyCodegenCtx, description?: string, experimental = isSchemaExperimental(schema) ): void { if (ctx.generatedNames.has(typeName)) { return; } ctx.generatedNames.add(typeName); const required = new Set(schema.required || []); const fieldEntries = Object.entries(schema.properties || {}).filter( ([, value]) => typeof value === "object" ) as Array<[string, JSONSchema7]>; const orderedFieldEntries = [ ...fieldEntries.filter(([name]) => required.has(name)).sort(([a], [b]) => a.localeCompare(b)), ...fieldEntries.filter(([name]) => !required.has(name)).sort(([a], [b]) => a.localeCompare(b)), ]; const fieldInfos = orderedFieldEntries.map(([propName, propSchema]) => { const isRequired = required.has(propName); const resolved = resolvePyPropertyType(propSchema, typeName, propName, isRequired, ctx); const baseFieldName = toPyFieldName(propName, propSchema, ctx); const fieldName = isSchemaInternal(propSchema) ? `_${baseFieldName}` : baseFieldName; return { jsonName: propName, fieldName, isRequired, resolved, }; }); const lines: string[] = []; if (experimental) { pushPyExperimentalComment(lines, "type"); } if (isSchemaDeprecated(schema)) { lines.push(`# Deprecated: this type is deprecated and will be removed in a future version.`); } lines.push(`@dataclass`); lines.push(`class ${typeName}:`); if (description || schema.description) { lines.push(` ${pyDocstringLiteral(description || schema.description || "")}`); } if (fieldInfos.length === 0) { lines.push(` @staticmethod`); lines.push(` def from_dict(obj: Any) -> "${typeName}":`); lines.push(` assert isinstance(obj, dict)`); lines.push(` return ${typeName}()`); lines.push(``); lines.push(` def to_dict(self) -> dict:`); lines.push(` return {}`); ctx.classes.push(lines.join("\n")); return; } for (const field of fieldInfos) { const suffix = field.isRequired ? "" : " = None"; const propSchema = orderedFieldEntries.find(([n]) => n === field.jsonName)?.[1] as JSONSchema7 | undefined; pushPyFieldMarkers(lines, propSchema); lines.push(` ${field.fieldName}: ${field.resolved.annotation}${suffix}`); } lines.push(``); lines.push(` @staticmethod`); lines.push(` def from_dict(obj: Any) -> "${typeName}":`); lines.push(` assert isinstance(obj, dict)`); for (const field of fieldInfos) { const sourceExpr = `obj.get(${JSON.stringify(field.jsonName)})`; lines.push( ` ${field.fieldName} = ${field.resolved.fromExpr(sourceExpr)}` ); } lines.push(` return ${typeName}(`); for (const field of fieldInfos) { lines.push(` ${field.fieldName}=${field.fieldName},`); } lines.push(` )`); lines.push(``); lines.push(` def to_dict(self) -> dict:`); lines.push(` result: dict = {}`); for (const field of fieldInfos) { const valueExpr = field.resolved.toExpr(`self.${field.fieldName}`); if (field.isRequired) { lines.push(` result[${JSON.stringify(field.jsonName)}] = ${valueExpr}`); } else { lines.push(` if self.${field.fieldName} is not None:`); lines.push(` result[${JSON.stringify(field.jsonName)}] = ${valueExpr}`); } } lines.push(` return result`); ctx.classes.push(lines.join("\n")); } function emitPyFlatDiscriminatedUnion( typeName: string, discriminatorProp: string, mapping: Map, ctx: PyCodegenCtx, description?: string, experimental = false ): void { if (ctx.generatedNames.has(typeName)) { return; } ctx.generatedNames.add(typeName); const allProps = new Map(); for (const [, variant] of mapping) { const required = new Set(variant.required || []); for (const [propName, propSchema] of Object.entries(variant.properties || {})) { if (typeof propSchema !== "object") { continue; } if (!allProps.has(propName)) { allProps.set(propName, { schema: propSchema as JSONSchema7, requiredInAll: required.has(propName), }); } else if (!required.has(propName)) { allProps.get(propName)!.requiredInAll = false; } } } const variantCount = mapping.size; for (const [propName, info] of allProps) { let presentCount = 0; for (const [, variant] of mapping) { if (variant.properties && propName in variant.properties) { presentCount++; } } if (presentCount < variantCount) { info.requiredInAll = false; } } const discriminatorEnumName = getOrCreatePyEnum( typeName + toPascalCase(discriminatorProp), [...mapping.keys()], ctx, description ? `${description} discriminator` : `${typeName} discriminator`, undefined, false, experimental ); const fieldEntries: Array<[string, JSONSchema7, boolean]> = [ [ discriminatorProp, { type: "string", enum: [...mapping.keys()], }, true, ], ...[...allProps.entries()] .filter(([propName]) => propName !== discriminatorProp) .map(([propName, info]) => [propName, info.schema, info.requiredInAll] as [string, JSONSchema7, boolean]), ]; const orderedFieldEntries = [ ...fieldEntries.filter(([, , requiredInAll]) => requiredInAll).sort(([a], [b]) => a.localeCompare(b)), ...fieldEntries.filter(([, , requiredInAll]) => !requiredInAll).sort(([a], [b]) => a.localeCompare(b)), ]; const fieldInfos = orderedFieldEntries.map(([propName, propSchema, requiredInAll]) => { let resolved: PyResolvedType; if (propName === discriminatorProp) { resolved = { annotation: discriminatorEnumName, fromExpr: (expr) => `parse_enum(${discriminatorEnumName}, ${expr})`, toExpr: (expr) => `to_enum(${discriminatorEnumName}, ${expr})`, }; } else { resolved = resolvePyPropertyType(propSchema, typeName, propName, requiredInAll, ctx); } return { jsonName: propName, fieldName: isSchemaInternal(propSchema) ? `_${toPyFieldName(propName, propSchema, ctx)}` : toPyFieldName(propName, propSchema, ctx), isRequired: requiredInAll, resolved, }; }); const lines: string[] = []; if (experimental) { pushPyExperimentalComment(lines, "type"); } lines.push(`@dataclass`); lines.push(`class ${typeName}:`); if (description) { lines.push(` ${pyDocstringLiteral(description)}`); } for (const field of fieldInfos) { const suffix = field.isRequired ? "" : " = None"; const fieldSchema = orderedFieldEntries.find(([n]) => n === field.jsonName)?.[1] as JSONSchema7 | undefined; pushPyFieldMarkers(lines, fieldSchema); lines.push(` ${field.fieldName}: ${field.resolved.annotation}${suffix}`); } lines.push(``); lines.push(` @staticmethod`); lines.push(` def from_dict(obj: Any) -> "${typeName}":`); lines.push(` assert isinstance(obj, dict)`); for (const field of fieldInfos) { const sourceExpr = `obj.get(${JSON.stringify(field.jsonName)})`; lines.push( ` ${field.fieldName} = ${field.resolved.fromExpr(sourceExpr)}` ); } lines.push(` return ${typeName}(`); for (const field of fieldInfos) { lines.push(` ${field.fieldName}=${field.fieldName},`); } lines.push(` )`); lines.push(``); lines.push(` def to_dict(self) -> dict:`); lines.push(` result: dict = {}`); for (const field of fieldInfos) { const valueExpr = field.resolved.toExpr(`self.${field.fieldName}`); if (field.isRequired) { lines.push(` result[${JSON.stringify(field.jsonName)}] = ${valueExpr}`); } else { lines.push(` if self.${field.fieldName} is not None:`); lines.push(` result[${JSON.stringify(field.jsonName)}] = ${valueExpr}`); } } lines.push(` return result`); ctx.classes.push(lines.join("\n")); } export function generatePythonSessionEventsCode(schema: JSONSchema7): string { const variants = extractPyEventVariants(schema); const ctx: PyCodegenCtx = { classes: [], aliases: [], aliasesByName: new Set(), enums: [], enumsByName: new Map(), generatedNames: new Set(), usesTimedelta: false, usesIntegerTimedelta: false, definitions: collectDefinitionCollections(schema as Record), refBasedUnions: [], }; for (const variant of variants) { emitPyClass( variant.dataClassName, variant.dataSchema, ctx, variant.dataDescription, variant.dataExperimental ); } const envelopeProperties = getPySharedEventEnvelopeProperties(schema, ctx); const envelopePropertiesWithoutDefaults = envelopeProperties.filter((property) => !property.hasDefault); const envelopePropertiesWithDefaults = envelopeProperties.filter((property) => property.hasDefault); const eventTypeLines: string[] = []; eventTypeLines.push(`class SessionEventType(Enum):`); for (const variant of variants) { if (variant.eventExperimental) { pushPyExperimentalComment(eventTypeLines, "event", " "); } eventTypeLines.push(` ${toEnumMemberName(variant.typeName)} = ${JSON.stringify(variant.typeName)}`); } eventTypeLines.push(` UNKNOWN = "unknown"`); eventTypeLines.push(``); eventTypeLines.push(` @classmethod`); eventTypeLines.push(` def _missing_(cls, value: object) -> "SessionEventType":`); eventTypeLines.push(` return cls.UNKNOWN`); const out: string[] = []; out.push(`"""`); out.push(`AUTO-GENERATED FILE - DO NOT EDIT`); out.push(`Generated from: session-events.schema.json`); out.push(`"""`); out.push(``); out.push(`from __future__ import annotations`); out.push(``); out.push(`from collections.abc import Callable`); out.push(`from dataclasses import dataclass`); out.push(ctx.usesTimedelta ? `from datetime import datetime, timedelta` : `from datetime import datetime`); out.push(`from enum import Enum`); out.push(`from typing import Any, TypeVar, cast`); out.push(`from uuid import UUID`); out.push(``); out.push(`import dateutil.parser`); out.push(``); out.push(`T = TypeVar("T")`); out.push(`EnumT = TypeVar("EnumT", bound=Enum)`); out.push(``); out.push(``); out.push(`def from_str(x: Any) -> str:`); out.push(` assert isinstance(x, str)`); out.push(` return x`); out.push(``); out.push(``); out.push(`def from_int(x: Any) -> int:`); out.push(` assert isinstance(x, int) and not isinstance(x, bool)`); out.push(` return x`); out.push(``); out.push(``); out.push(`def to_int(x: Any) -> int:`); out.push(` assert isinstance(x, int) and not isinstance(x, bool)`); out.push(` return x`); out.push(``); out.push(``); out.push(`def from_float(x: Any) -> float:`); out.push(` assert isinstance(x, (float, int)) and not isinstance(x, bool)`); out.push(` return float(x)`); out.push(``); out.push(``); out.push(`def to_float(x: Any) -> float:`); out.push(` assert isinstance(x, (float, int)) and not isinstance(x, bool)`); out.push(` return float(x)`); out.push(``); out.push(``); if (ctx.usesTimedelta) { out.push(`def from_timedelta(x: Any) -> timedelta:`); out.push(` assert isinstance(x, (float, int)) and not isinstance(x, bool)`); out.push(` return timedelta(milliseconds=float(x))`); out.push(``); out.push(``); if (ctx.usesIntegerTimedelta) { out.push(`def to_timedelta_int(x: timedelta) -> int:`); out.push(` assert isinstance(x, timedelta)`); out.push(` milliseconds = x.total_seconds() * 1000.0`); out.push(` assert milliseconds.is_integer()`); out.push(` return int(milliseconds)`); out.push(``); out.push(``); } out.push(`def to_timedelta(x: timedelta) -> float:`); out.push(` assert isinstance(x, timedelta)`); out.push(` return x.total_seconds() * 1000.0`); out.push(``); out.push(``); } out.push(`def from_bool(x: Any) -> bool:`); out.push(` assert isinstance(x, bool)`); out.push(` return x`); out.push(``); out.push(``); out.push(`def from_none(x: Any) -> Any:`); out.push(` assert x is None`); out.push(` return x`); out.push(``); out.push(``); out.push(`def from_union(fs: list[Callable[[Any], T]], x: Any) -> T:`); out.push(` for f in fs:`); out.push(` try:`); out.push(` return f(x)`); out.push(` except Exception:`); out.push(` pass`); out.push(` assert False`); out.push(``); out.push(``); out.push(`def from_list(f: Callable[[Any], T], x: Any) -> list[T]:`); out.push(` assert isinstance(x, list)`); out.push(` return [f(item) for item in x]`); out.push(``); out.push(``); out.push(`def from_dict(f: Callable[[Any], T], x: Any) -> dict[str, T]:`); out.push(` assert isinstance(x, dict)`); out.push(` return {key: f(value) for key, value in x.items()}`); out.push(``); out.push(``); out.push(`def from_datetime(x: Any) -> datetime:`); out.push(` return dateutil.parser.parse(from_str(x))`); out.push(``); out.push(``); out.push(`def to_datetime(x: datetime) -> str:`); out.push(` return x.isoformat()`); out.push(``); out.push(``); out.push(`def from_uuid(x: Any) -> UUID:`); out.push(` return UUID(from_str(x))`); out.push(``); out.push(``); out.push(`def to_uuid(x: UUID) -> str:`); out.push(` return str(x)`); out.push(``); out.push(``); out.push(`def parse_enum(c: type[EnumT], x: Any) -> EnumT:`); out.push(` assert isinstance(x, str)`); out.push(` return c(x)`); out.push(``); out.push(``); out.push(`def to_class(c: type[T], x: Any) -> dict:`); out.push(` assert isinstance(x, c)`); out.push(` return cast(Any, x).to_dict()`); out.push(``); out.push(``); out.push(`def to_enum(c: type[EnumT], x: Any) -> str:`); out.push(` assert isinstance(x, c)`); out.push(` return cast(str, x.value)`); out.push(``); out.push(``); out.push(eventTypeLines.join("\n")); out.push(``); out.push(``); out.push(`@dataclass`); out.push(`class RawSessionEventData:`); out.push(` raw: Any`); out.push(``); out.push(` @staticmethod`); out.push(` def from_dict(obj: Any) -> "RawSessionEventData":`); out.push(` return RawSessionEventData(obj)`); out.push(``); out.push(` def to_dict(self) -> Any:`); out.push(` return self.raw`); out.push(``); out.push(``); out.push(`def _compat_to_python_key(name: str) -> str:`); out.push(` normalized = name.replace(".", "_")`); out.push(` result: list[str] = []`); out.push(` for index, char in enumerate(normalized):`); out.push( ` if char.isupper() and index > 0 and (not normalized[index - 1].isupper() or (index + 1 < len(normalized) and normalized[index + 1].islower())):` ); out.push(` result.append("_")`); out.push(` result.append(char.lower())`); out.push(` return "".join(result)`); out.push(``); out.push(``); out.push(`def _compat_to_json_key(name: str) -> str:`); out.push(` parts = name.split("_")`); out.push(` if not parts:`); out.push(` return name`); out.push(` return parts[0] + "".join(part[:1].upper() + part[1:] for part in parts[1:])`); out.push(``); out.push(``); out.push(`def _compat_to_json_value(value: Any) -> Any:`); out.push(` if hasattr(value, "to_dict"):`); out.push(` return cast(Any, value).to_dict()`); out.push(` if isinstance(value, Enum):`); out.push(` return value.value`); out.push(` if isinstance(value, datetime):`); out.push(` return value.isoformat()`); if (ctx.usesTimedelta) { out.push(` if isinstance(value, timedelta):`); out.push(` return value.total_seconds() * 1000.0`); } out.push(` if isinstance(value, UUID):`); out.push(` return str(value)`); out.push(` if isinstance(value, list):`); out.push(` return [_compat_to_json_value(item) for item in value]`); out.push(` if isinstance(value, dict):`); out.push(` return {key: _compat_to_json_value(item) for key, item in value.items()}`); out.push(` return value`); out.push(``); out.push(``); out.push(`def _compat_from_json_value(value: Any) -> Any:`); out.push(` return value`); out.push(``); out.push(``); out.push(`class Data:`); out.push(` """Backward-compatible shim for manually constructed event payloads."""`); out.push(``); out.push(` def __init__(self, **kwargs: Any):`); out.push(` self._values = {key: _compat_from_json_value(value) for key, value in kwargs.items()}`); out.push(` for key, value in self._values.items():`); out.push(` setattr(self, key, value)`); out.push(``); out.push(` @staticmethod`); out.push(` def from_dict(obj: Any) -> "Data":`); out.push(` assert isinstance(obj, dict)`); out.push( ` return Data(**{_compat_to_python_key(key): _compat_from_json_value(value) for key, value in obj.items()})` ); out.push(``); out.push(` def to_dict(self) -> dict:`); out.push( ` return {_compat_to_json_key(key): _compat_to_json_value(value) for key, value in self._values.items() if value is not None}` ); out.push(``); out.push(``); for (const classDef of ctx.classes.sort()) { out.push(classDef); out.push(``); out.push(``); } for (const aliasDef of ctx.aliases.sort()) { out.push(aliasDef); out.push(``); out.push(``); } for (const enumDef of ctx.enums.sort()) { out.push(enumDef); out.push(``); out.push(``); } const sessionEventDataTypes = [ ...variants.map((variant) => variant.dataClassName), "RawSessionEventData", "Data", ]; out.push(`SessionEventData = ${sessionEventDataTypes.join(" | ")}`); out.push(``); out.push(``); out.push(`@dataclass`); out.push(`class SessionEvent:`); out.push(` data: SessionEventData`); for (const property of envelopePropertiesWithoutDefaults) { out.push(` ${property.fieldName}: ${property.resolved.annotation}`); } out.push(` type: SessionEventType`); for (const property of envelopePropertiesWithDefaults) { out.push(` ${property.fieldName}: ${property.resolved.annotation} = None`); } out.push(` raw_type: str | None = None`); out.push(``); out.push(` @staticmethod`); out.push(` def from_dict(obj: Any) -> "SessionEvent":`); out.push(` assert isinstance(obj, dict)`); out.push(` raw_type = from_str(obj.get("type"))`); out.push(` event_type = SessionEventType(raw_type)`); for (const property of envelopeProperties) { out.push(` ${property.fieldName} = ${property.resolved.fromExpr(`obj.get(${JSON.stringify(property.jsonName)})`)}`); } out.push(` data_obj = obj.get("data")`); out.push(` match event_type:`); for (const variant of variants) { out.push( ` case SessionEventType.${toEnumMemberName(variant.typeName)}: data = ${variant.dataClassName}.from_dict(data_obj)` ); } out.push(` case _: data = RawSessionEventData.from_dict(data_obj)`); out.push(` return SessionEvent(`); out.push(` data=data,`); for (const property of envelopePropertiesWithoutDefaults) { out.push(` ${property.fieldName}=${property.fieldName},`); } out.push(` type=event_type,`); for (const property of envelopePropertiesWithDefaults) { out.push(` ${property.fieldName}=${property.fieldName},`); } out.push(` raw_type=raw_type if event_type == SessionEventType.UNKNOWN else None,`); out.push(` )`); out.push(``); out.push(` def to_dict(self) -> dict:`); out.push(` result: dict = {}`); out.push(` result["data"] = self.data.to_dict()`); for (const property of envelopePropertiesWithoutDefaults) { out.push(` result[${JSON.stringify(property.jsonName)}] = ${property.resolved.toExpr(`self.${property.fieldName}`)}`); } out.push( ` result["type"] = self.raw_type if self.type == SessionEventType.UNKNOWN and self.raw_type is not None else to_enum(SessionEventType, self.type)` ); for (const property of envelopePropertiesWithDefaults) { const valueExpr = property.resolved.toExpr(`self.${property.fieldName}`); if (property.required) { out.push(` result[${JSON.stringify(property.jsonName)}] = ${valueExpr}`); } else { out.push(` if self.${property.fieldName} is not None:`); out.push(` result[${JSON.stringify(property.jsonName)}] = ${valueExpr}`); } } out.push(` return result`); out.push(``); out.push(``); out.push(`def session_event_from_dict(s: Any) -> SessionEvent:`); out.push(` return SessionEvent.from_dict(s)`); out.push(``); out.push(``); out.push(`def session_event_to_dict(x: SessionEvent) -> Any:`); out.push(` return x.to_dict()`); out.push(``); out.push(``); let finalCode = postProcessPythonSessionEventCode(out.join("\n")); finalCode = postProcessDiscriminatorDefaultsForPython(finalCode, ctx.refBasedUnions); return finalCode; } async function generateSessionEvents(schemaPath?: string): Promise { console.log("Python: generating session-events..."); const resolvedPath = schemaPath ?? (await getSessionEventsSchemaPath()); const schema = (await loadSchemaJson(resolvedPath)) as JSONSchema7; const processed = propagateInternalVisibility(postProcessSchema(schema)); let code = generatePythonSessionEventsCode(processed); const { typeNames } = collectInternalSymbols(processed); code = renameInternalPythonSymbols(code, typeNames); code = appendPythonSessionEventsAllList(code, processed, typeNames); const outPath = await writeGeneratedFile("python/copilot/generated/session_events.py", code); console.log(` ✓ ${outPath}`); } // ── RPC Types ─────────────────────────────────────────────────────────────── async function generateRpc(schemaPath?: string, sessionEventsSchema?: JSONSchema7): Promise { console.log("Python: generating RPC types..."); const { FetchingJSONSchemaStore, InputData, JSONSchemaInput, quicktype } = await import("quicktype-core"); const resolvedPath = schemaPath ?? (await getApiSchemaPath()); let schema = fixNullableRequiredRefsInApiSchema(cloneSchemaForCodegen((await loadSchemaJson(resolvedPath)) as ApiSchema)); if (sessionEventsSchema) { const sharedDefinitions = findSharedSchemaDefinitions( schema as unknown as Record, sessionEventsSchema as unknown as Record ); const reachableDefinitions = collectReachableDefinitionNames(sessionEventsSchema as unknown as Record); const exportedSessionEventTypes = collectPythonSessionEventExportedTypeNames(sessionEventsSchema); for (const name of [...sharedDefinitions]) { if (!reachableDefinitions.has(name) || !exportedSessionEventTypes.has(name)) { sharedDefinitions.delete(name); } } schema = rewriteSharedDefinitionReferences(schema, sharedDefinitions, "session-events.schema.json"); } const allMethods = [ ...collectRpcMethods(schema.server || {}), ...collectRpcMethods(schema.session || {}), ...collectRpcMethods(schema.clientSession || {}), ]; // Build a combined schema for quicktype, including shared definitions from the API schema rpcDefinitions = collectDefinitionCollections(schema as Record); const combinedSchema = withSharedDefinitions( { $schema: "http://json-schema.org/draft-07/schema#", }, rpcDefinitions ); for (const method of allMethods) { const resultSchema = getMethodResultSchema(method); if (!isVoidSchema(resultSchema)) { const nullableInner = resultSchema ? getNullableInner(resultSchema) : undefined; if (!nullableInner) { combinedSchema.definitions![pythonResultTypeName(method)] = withRootTitle( schemaSourceForNamedDefinition(method.result, resultSchema), pythonResultTypeName(method) ); } // For nullable results, the inner type (e.g., SessionFsError) is already in definitions } const resolvedParams = getMethodParamsSchema(method); if (method.params && hasSchemaPayload(resolvedParams)) { if (method.rpcMethod.startsWith("session.") && resolvedParams?.properties) { const filtered: JSONSchema7 = { ...resolvedParams, properties: Object.fromEntries( Object.entries(resolvedParams.properties).filter(([k]) => k !== "sessionId") ), required: resolvedParams.required?.filter((r) => r !== "sessionId"), }; if (hasSchemaPayload(filtered)) { combinedSchema.definitions![pythonParamsTypeName(method)] = withRootTitle( filtered, pythonParamsTypeName(method) ); } } else { combinedSchema.definitions![pythonParamsTypeName(method)] = withRootTitle( schemaSourceForNamedDefinition(method.params, resolvedParams), pythonParamsTypeName(method) ); } } } const allDefinitions = combinedSchema.definitions! as Record; preservePythonRpcStringDateFields(allDefinitions); const allDefinitionCollections: DefinitionCollections = { definitions: { ...(combinedSchema.$defs ?? {}), ...allDefinitions }, $defs: { ...allDefinitions, ...(combinedSchema.$defs ?? {}) }, }; // Generate types via quicktype — use a single combined schema source to avoid // quicktype inventing Purple/Fluffy disambiguation prefixes for shared types const schemaInput = new JSONSchemaInput(new FetchingJSONSchemaStore()); const singleSchema: Record = { $schema: "http://json-schema.org/draft-07/schema#", type: "object", definitions: stripBooleanLiterals(allDefinitions), properties: Object.fromEntries( Object.keys(allDefinitions).map((name) => [name, { $ref: `#/definitions/${name}` }]) ), required: Object.keys(allDefinitions), }; const externalRefs = rewriteExternalRefsForPython(singleSchema as JSONSchema7 & { definitions?: Record }); const externalEnumNames = collectPythonExternalEnumNames(sessionEventsSchema, externalRefs.placeholderNames); const externalUnionAliases = collectExternalUnionAliasesForPython( singleSchema.definitions as Record, externalRefs.placeholderNames ); await schemaInput.addSource({ name: "RPC", schema: JSON.stringify(singleSchema) }); const inputData = new InputData(); inputData.addInput(schemaInput); const qtResult = await quicktype({ inputData, lang: "python", rendererOptions: { "python-version": "3.7" }, // Disable quicktype's structural-equality merging of class types. // It produces fuzzy synthesized names (e.g. ``PermissionDecisionApproveForIonApproval`` // as the merge of ``PermissionDecisionApproveFor{Session,Location}Approval``) which // are unstable: any future divergence between the variants would silently change // the generated class name. We rely on the schema's named definitions and resolve // structural unions via :func:`postProcessRefBasedDiscriminatedUnionsForPython`, // so the merging is also redundant. inferenceFlags: { combineClasses: false }, }); let typesCode = qtResult.lines.join("\n"); // Quicktype emits optional Any-typed fields without defaults; add them back. typesCode = typesCode.replace(/: Any$/gm, ": Any = None"); // The synthesized root RPC dataclass includes one required field per schema definition. // Keep Any-typed definition fields required so later required fields don't trip dataclass // ordering rules at import time. typesCode = typesCode.replace( /(@dataclass\r?\nclass RPC:\r?\n)([\s\S]*?)(\r?\n @staticmethod)/, (match, prefix: string, body: string, suffix: string) => { let updatedBody = body; for (const definitionName of Object.keys(allDefinitions)) { const fieldName = toSnakeCase(definitionName); updatedBody = updatedBody.replace(new RegExp(`^( ${fieldName}: Any) = None$`, "m"), "$1"); } return `${prefix}${updatedBody}${suffix}`; } ); typesCode = removeRequiredAnyDefaultsForPython(typesCode, allDefinitions, allDefinitionCollections); // Fix bare except: to use Exception (required by ruff/pylint) typesCode = typesCode.replace(/except:/g, "except Exception:"); // Remove unnecessary pass when class has methods (quicktype generates pass for empty schemas) typesCode = typesCode.replace(/^(\s*)pass\n\n(\s*@staticmethod)/gm, "$2"); // Modernize to Python 3.11+ syntax typesCode = modernizePython(typesCode); const knownDefNames = new Set(Object.keys(allDefinitions).map((n) => n.toLowerCase())); typesCode = collapsePlaceholderPythonDataclasses(typesCode, knownDefNames); typesCode = postProcessExternalUnionAliasesForPython(typesCode, externalUnionAliases); typesCode = postProcessExternalRefsForPython(typesCode, externalRefs.placeholderNames, externalEnumNames); const { code: typesCodeAfterUnions, unions: refBasedUnions } = postProcessRefBasedDiscriminatedUnionsForPython( typesCode, allDefinitions, allDefinitionCollections ); typesCode = typesCodeAfterUnions; typesCode = modernizePython(typesCode); // Fix quicktype's Enum-suffix renaming: quicktype sometimes renames "Xyz" to // "XyzEnum" to avoid internal collisions. Strip the suffix to match our schema // definition names when that is unambiguous. If the schema already led // quicktype to emit both names, keep quicktype's disambiguated suffix. for (const defName of Object.keys(allDefinitions)) { const enumSuffixed = defName + "Enum"; if (Object.prototype.hasOwnProperty.call(allDefinitions, enumSuffixed)) continue; if (!new RegExp(`\\bclass ${enumSuffixed}\\b`).test(typesCode)) continue; const renamed = typesCode.replace(new RegExp(`\\b${enumSuffixed}\\b`, "g"), defName); const classCount = (renamed.match(new RegExp(`^class ${defName}\\b`, "gm")) ?? []).length; if (classCount > 1) { continue; } typesCode = renamed; } // Reorder class/enum definitions to resolve forward references. // Quicktype may emit classes before their dependencies are defined. typesCode = reorderPythonForwardRefs(typesCode); // Strip quicktype's import block and preamble — we provide our own unified header. // The preamble ends just before the first helper function (e.g. "def from_str") // or class definition. typesCode = typesCode.replace(/^[\s\S]*?(?=^(?:def |@dataclass|class )\w)/m, ""); // Strip trailing whitespace from blank lines (e.g. inside multi-line docstrings) typesCode = typesCode.replace(/^\s+$/gm, ""); // Annotate experimental data types const experimentalTypeNames = new Set(); for (const name of collectExperimentalOnlyRpcReferencedDefinitionNames(allMethods, allDefinitionCollections)) { experimentalTypeNames.add(name); } const nonExperimentalReferencedTypes = collectRpcMethodReferencedDefinitionNames( allMethods.filter((method) => method.stability !== "experimental"), allDefinitionCollections ); for (const [definitionName, definition] of Object.entries(allDefinitions)) { if (typeof definition === "object" && definition !== null && isSchemaExperimental(definition as JSONSchema7)) { experimentalTypeNames.add(definitionName); } } for (const method of allMethods) { if (method.stability !== "experimental") continue; if (!nonExperimentalReferencedTypes.has(pythonResultTypeName(method))) { experimentalTypeNames.add(pythonResultTypeName(method)); } const paramsTypeName = pythonParamsTypeName(method); if (allDefinitions[paramsTypeName] && !nonExperimentalReferencedTypes.has(paramsTypeName)) { experimentalTypeNames.add(paramsTypeName); } } // Annotate deprecated data types const deprecatedTypeNames = new Set(); for (const method of allMethods) { if (!method.deprecated) continue; if (!method.result?.$ref) { deprecatedTypeNames.add(pythonResultTypeName(method)); } if (!method.params?.$ref) { const paramsTypeName = pythonParamsTypeName(method); if (allDefinitions[paramsTypeName]) { deprecatedTypeNames.add(paramsTypeName); } } } // Annotate internal data types (driven by the JSON Schema definition's // `visibility: "internal"` flag, set via `.asInternal()` on the Zod source). const internalTypeNames = new Set(); for (const [name, def] of Object.entries(allDefinitions)) { if (def && typeof def === "object" && (def as Record).visibility === "internal") { internalTypeNames.add(name); } } // Extract actual class names generated by quicktype (may differ from toPascalCase, // e.g. quicktype produces "SessionMCPList" not "SessionMcpList") const actualTypeNames = new Map(); const classRe = /^class\s+(\w+)\b/gm; let cm; while ((cm = classRe.exec(typesCode)) !== null) { actualTypeNames.set(cm[1].toLowerCase(), cm[1]); } // quicktype can also choose a shorter generated class name for a titled schema // definition. Its root RPC dataclass still records the definition field and // generated class mapping, so use that as an alias table for RPC wrappers. const definitionAliases = new Map(); const publicTypeAliases = new Map(); const rootFields = typesCode.match(/^class RPC:\n([\s\S]*?)\n @staticmethod/m)?.[1] ?? ""; const rootFieldTypes = new Map(); for (const line of rootFields.split(/\r?\n/)) { const match = line.match(/^ ([A-Za-z_]\w*): ([A-Za-z_]\w*)\b/); if (match) { rootFieldTypes.set(match[1], match[2]); } } for (const defName of Object.keys(allDefinitions)) { const actualName = rootFieldTypes.get(toSnakeCase(defName)); if (actualName) { definitionAliases.set(defName.toLowerCase(), actualName); if (actualName !== defName && !actualTypeNames.has(defName.toLowerCase()) && /^[A-Za-z_]\w*$/.test(defName)) { publicTypeAliases.set(defName, actualName); } } } const compatibilityTypeAliases = new Map([ ["TaskInfoExecutionMode", "TaskExecutionMode"], ["TaskInfoStatus", "TaskStatus"], ["TaskInfoType", "TaskAgentProgressType"], ]); for (const [aliasName, targetName] of compatibilityTypeAliases) { if (actualTypeNames.has(targetName.toLowerCase()) && !actualTypeNames.has(aliasName.toLowerCase())) { publicTypeAliases.set(aliasName, actualTypeNames.get(targetName.toLowerCase()) ?? targetName); } } const resolveType = (name: string): string => actualTypeNames.get(name.toLowerCase()) ?? definitionAliases.get(name.toLowerCase()) ?? name; const annotatePythonTypes = (typeNames: Iterable, comment: string): void => { const annotated = new Set(); for (const typeName of typeNames) { const actualName = resolveType(typeName); if (annotated.has(actualName)) continue; let replaced = false; typesCode = typesCode.replace( new RegExp(`^(@dataclass\\n)?class ${actualName}[:(]`, "m"), (match) => { replaced = true; return `${comment}\n${match}`; } ); if (replaced) { annotated.add(actualName); } } }; annotatePythonTypes(experimentalTypeNames, pyExperimentalComment("type")); annotatePythonTypes(deprecatedTypeNames, "# Deprecated: this type is part of a deprecated API and will be removed in a future version."); annotatePythonTypes(internalTypeNames, "# Internal: this type is an internal SDK API and is not part of the public surface."); const lines: string[] = []; lines.push(`""" AUTO-GENERATED FILE - DO NOT EDIT Generated from: api.schema.json """ from __future__ import annotations from typing import TYPE_CHECKING ${[...externalRefs.imports.entries()] .map(([module, names]) => `from ${module} import ${[...names].sort().join(", ")}`) .join("\n")} if TYPE_CHECKING: from .._jsonrpc import JsonRpcClient from collections.abc import Callable from dataclasses import dataclass from datetime import datetime from enum import Enum from typing import Any, Protocol, TypeVar, cast from uuid import UUID import dateutil.parser T = TypeVar("T") EnumT = TypeVar("EnumT", bound=Enum) `); lines.push(typesCode); if (publicTypeAliases.size > 0) { lines.push(""); for (const [aliasName, targetName] of [...publicTypeAliases.entries()].sort(([left], [right]) => left.localeCompare(right), )) { lines.push(`${aliasName} = ${targetName}`); } } lines.push(` def _timeout_kwargs(timeout: float | None) -> dict: """Build keyword arguments for optional timeout forwarding.""" if timeout is not None: return {"timeout": timeout} return {} def _patch_model_capabilities(data: dict) -> dict: """Ensure model capabilities have required fields. TODO: Remove once the runtime schema correctly marks these fields as optional. Some models (e.g. embedding models) may omit 'limits' or 'supports' in their capabilities, or omit 'max_context_window_tokens' within limits. The generated deserializer requires these fields, so we supply defaults here. """ for model in data.get("models", []): caps = model.get("capabilities") if caps is None: model["capabilities"] = {"supports": {}, "limits": {"max_context_window_tokens": 0}} continue if "supports" not in caps: caps["supports"] = {} if "limits" not in caps: caps["limits"] = {"max_context_window_tokens": 0} elif "max_context_window_tokens" not in caps["limits"]: caps["limits"]["max_context_window_tokens"] = 0 return data `); // Emit RPC wrapper classes if (schema.server) { const publicNode = filterNodeByVisibility(schema.server, "public"); if (publicNode) emitRpcWrapper(lines, publicNode, false, resolveType, ""); const internalNode = filterNodeByVisibility(schema.server, "internal"); if (internalNode) emitRpcWrapper(lines, internalNode, false, resolveType, "_Internal"); } if (schema.session) { const publicNode = filterNodeByVisibility(schema.session, "public"); if (publicNode) emitRpcWrapper(lines, publicNode, true, resolveType, ""); const internalNode = filterNodeByVisibility(schema.session, "internal"); if (internalNode) emitRpcWrapper(lines, internalNode, true, resolveType, "_Internal"); } if (schema.clientSession) { emitClientSessionApiRegistration(lines, schema.clientSession, resolveType); } // Patch models.list to normalize capabilities before deserialization let finalCode = lines.join("\n"); finalCode = finalCode.replace( `ModelList.from_dict(await self._client.request("models.list"`, `ModelList.from_dict(_patch_model_capabilities(await self._client.request("models.list"`, ); // Close the extra paren opened by _patch_model_capabilities( // Match everything from _patch_model_capabilities( up to the end of the return statement finalCode = finalCode.replace( /(_patch_model_capabilities\(await self\._client\.request\("models\.list"[^)]*\)[^)]*\))/, "$1)", ); // Apply union rewrites to the assembled code so RPC method wrappers // generated after the types section also route Name.from_dict / to_class // through the discriminator dispatcher. finalCode = applyUnionRewritesToPython(finalCode, refBasedUnions); finalCode = postProcessDiscriminatorDefaultsForPython(finalCode, refBasedUnions); finalCode = unwrapRedundantPythonLambdas(finalCode); // Apply `_`-prefix to type names of internal RPC types so the leading-underscore // Python convention signals "internal, no stability guarantees" to consumers. { const internalDefs = new Set(); for (const [name, def] of Object.entries(rpcDefinitions.definitions)) { if (def && typeof def === "object" && (def as Record).visibility === "internal") { internalDefs.add(name); } } for (const [name, def] of Object.entries(rpcDefinitions.$defs)) { if (def && typeof def === "object" && (def as Record).visibility === "internal") { internalDefs.add(name); } } if (internalDefs.size > 0) { finalCode = renameInternalPythonSymbols(finalCode, internalDefs); } } // Annotate internal fields on otherwise-public RPC types with a `# Internal:` // comment immediately above the field declaration. Quicktype's generated // from_dict/to_dict reference field names in patterns that are brittle to // regex-based identifier rewriting, so we annotate rather than rename. The // marker is visible in IDE hovers and signals "internal, no stability // guarantee" without breaking the wire-protocol round-trip. { const combinedSchema: JSONSchema7 = { definitions: { ...(rpcDefinitions.definitions as Record), ...(rpcDefinitions.$defs as Record), }, }; const fieldsByType = collectInternalFieldsOnPublicTypes(combinedSchema); if (fieldsByType.size > 0) { finalCode = annotateInternalPythonFields(finalCode, fieldsByType, toSnakeCase); } } finalCode = appendPythonRpcAllList(finalCode, rpcDefinitions); const outPath = await writeGeneratedFile("python/copilot/generated/rpc.py", finalCode); console.log(` ✓ ${outPath}`); } /** * Appends an `__all__` list to the generated session-events module so that * the public ``copilot.session_events`` shim can ``from .generated.session_events * import *`` without leaking helper functions (``from_str``, ``from_int``, …) * or TypeVars (``T``, ``EnumT``). Internal-marked types are omitted so they * remain hidden from the SDK's public surface even though their renamed * (`_`-prefixed) form is still present in the module for cross-module use. */ function appendPythonSessionEventsAllList(code: string, _schema: JSONSchema7, internalTypeNames: Set): string { const exported = new Set(); // All top-level public classes (schema-derived and inline event payload // shapes alike). The codegen only emits classes that are part of the // protocol surface, so a class-presence filter is sufficient — the // utility module excludes helpers like `from_str` / `to_class` because // they are functions, not classes, and TypeVars are assignments. const classPattern = /^class\s+([A-Za-z_]\w*)\b/gm; let match: RegExpExecArray | null; while ((match = classPattern.exec(code)) !== null) { const name = match[1]; if (name.startsWith("_")) continue; if (internalTypeNames.has(name)) continue; exported.add(name); } // Top-level CamelCase Assign targets (e.g. `SessionEventData = X | Y | // ...` discriminated-union aliases). Skip TypeVars. const assignPattern = /^([A-Z][A-Za-z0-9_]*)\s*=/gm; while ((match = assignPattern.exec(code)) !== null) { const name = match[1]; if (name === "T" || name === "EnumT") continue; if (internalTypeNames.has(name)) continue; exported.add(name); } // Public top-level free functions named like `session_event_from_dict` // — the documented entry point for parsing event payloads from raw dicts. // Helper functions like `from_str` / `to_class` live in `utility` (a // different module) so they don't appear here. const fnPattern = /^def\s+([a-z][A-Za-z0-9_]*)\s*\(/gm; while ((match = fnPattern.exec(code)) !== null) { const name = match[1]; if (name.startsWith("_")) continue; if (!name.endsWith("_from_dict") && !name.endsWith("_to_dict")) continue; exported.add(name); } return code.replace(/\s*$/, "") + "\n\n" + renderPythonAllList([...exported].sort()) + "\n"; } /** * Appends an `__all__` list to the generated RPC module so that the public * ``copilot.rpc`` shim can ``from .generated.rpc import *`` without leaking * helper functions (``from_str``, ``from_int``, …) or TypeVars * (``T``, ``EnumT``). * * Shared types pulled in from session-events (via ``from .session_events * import …``) are intentionally excluded so each protocol type has a single * canonical public location. Callers reach them through * ``copilot.session_events.X`` — matching the C# codegen, which emits shared * types only in ``GitHub.Copilot`` and references them from * ``GitHub.Copilot.Rpc`` by fully-qualified name. */ function appendPythonRpcAllList(code: string, _definitions: { definitions: Record; $defs: Record }): string { const exported = new Set(); const classPattern = /^class\s+([A-Za-z_]\w*)\b/gm; let m: RegExpExecArray | null; while ((m = classPattern.exec(code)) !== null) { const name = m[1]; if (name.startsWith("_")) continue; exported.add(name); } const assignPattern = /^([A-Z][A-Za-z0-9_]*)\s*=/gm; while ((m = assignPattern.exec(code)) !== null) { const name = m[1]; if (name === "T" || name === "EnumT") continue; exported.add(name); } for (const helper of ["rpc_from_dict", "rpc_to_dict"]) { if (new RegExp(`^def\\s+${helper}\\b`, "m").test(code)) { exported.add(helper); } } return code.replace(/\s*$/, "") + "\n\n" + renderPythonAllList([...exported].sort()) + "\n"; } function renderPythonAllList(names: string[]): string { const lines: string[] = ["__all__ = ["]; for (const name of names) { lines.push(` ${JSON.stringify(name)},`); } lines.push("]"); return lines.join("\n"); } function collectPythonSessionEventExportedTypeNames(schema: JSONSchema7): Set { const definitions = collectDefinitionCollections(schema as Record); const definitionNames = new Set([...Object.keys(definitions.definitions), ...Object.keys(definitions.$defs)]); const code = generatePythonSessionEventsCode(schema); const exported = new Set(); const symbolPattern = /^(?:class\s+([A-Za-z_]\w*)\b|([A-Za-z_]\w*)\s*=)/gm; let match: RegExpExecArray | null; while ((match = symbolPattern.exec(code)) !== null) { const name = match[1] ?? match[2]; if (definitionNames.has(name)) { exported.add(name); } } return exported; } function emitPyApiGroup( lines: string[], apiName: string, node: Record, isSession: boolean, resolveType: (name: string) => string, groupExperimental: boolean, groupDeprecated: boolean = false, classPrefix: string = "" ): void { const subGroups = Object.entries(node).filter(([, v]) => typeof v === "object" && v !== null && !isRpcMethod(v)); // Emit sub-group classes first (Python needs definitions before use) for (const [subGroupName, subGroupNode] of subGroups) { const subApiName = apiName.replace(/Api$/, "") + toPascalCase(subGroupName) + "Api"; const subGroupExperimental = isNodeFullyExperimental(subGroupNode as Record); const subGroupDeprecated = isNodeFullyDeprecated(subGroupNode as Record); emitPyApiGroup(lines, subApiName, subGroupNode as Record, isSession, resolveType, subGroupExperimental, subGroupDeprecated, classPrefix); } // Emit this class if (groupDeprecated) { lines.push(`# Deprecated: this API group is deprecated and will be removed in a future version.`); } if (groupExperimental) { pushPyExperimentalApiGroupComment(lines); } lines.push(`class ${apiName}:`); if (isSession) { lines.push(` def __init__(self, client: "JsonRpcClient", session_id: str):`); lines.push(` self._client = client`); lines.push(` self._session_id = session_id`); for (const [subGroupName] of subGroups) { const subApiName = apiName.replace(/Api$/, "") + toPascalCase(subGroupName) + "Api"; lines.push(` self.${toSnakeCase(subGroupName)} = ${subApiName}(client, session_id)`); } } else { lines.push(` def __init__(self, client: "JsonRpcClient"):`); lines.push(` self._client = client`); for (const [subGroupName] of subGroups) { const subApiName = apiName.replace(/Api$/, "") + toPascalCase(subGroupName) + "Api"; lines.push(` self.${toSnakeCase(subGroupName)} = ${subApiName}(client)`); } } lines.push(``); for (const [key, value] of Object.entries(node)) { if (!isRpcMethod(value)) continue; emitMethod(lines, key, value, isSession, resolveType, groupExperimental, groupDeprecated); } lines.push(``); } function emitRpcWrapper(lines: string[], node: Record, isSession: boolean, resolveType: (name: string) => string, classPrefix: string = ""): void { const groups = Object.entries(node).filter(([, v]) => typeof v === "object" && v !== null && !isRpcMethod(v)); const topLevelMethods = Object.entries(node).filter(([, v]) => isRpcMethod(v)); const wrapperName = classPrefix + (isSession ? "SessionRpc" : "ServerRpc"); // Emit API classes for groups (recursively handles sub-groups) for (const [groupName, groupNode] of groups) { const prefix = classPrefix + (isSession ? "" : "Server"); const apiName = prefix + toPascalCase(groupName) + "Api"; const groupExperimental = isNodeFullyExperimental(groupNode as Record); const groupDeprecated = isNodeFullyDeprecated(groupNode as Record); emitPyApiGroup(lines, apiName, groupNode as Record, isSession, resolveType, groupExperimental, groupDeprecated, classPrefix); } // Emit wrapper class if (isSession) { lines.push(`class ${wrapperName}:`); lines.push(classPrefix === "_Internal" ? ` """Internal SDK session-scoped RPC methods. Not part of the public API."""` : ` """Typed session-scoped RPC methods."""`); lines.push(` def __init__(self, client: "JsonRpcClient", session_id: str):`); lines.push(` self._client = client`); lines.push(` self._session_id = session_id`); for (const [groupName] of groups) { lines.push(` self.${toSnakeCase(groupName)} = ${classPrefix}${toPascalCase(groupName)}Api(client, session_id)`); } } else { lines.push(`class ${wrapperName}:`); lines.push(classPrefix === "_Internal" ? ` """Internal SDK server-scoped RPC methods. Not part of the public API."""` : ` """Typed server-scoped RPC methods."""`); lines.push(` def __init__(self, client: "JsonRpcClient"):`); lines.push(` self._client = client`); for (const [groupName] of groups) { lines.push(` self.${toSnakeCase(groupName)} = ${classPrefix}Server${toPascalCase(groupName)}Api(client)`); } } lines.push(``); // Top-level methods for (const [key, value] of topLevelMethods) { if (!isRpcMethod(value)) continue; emitMethod(lines, key, value, isSession, resolveType, false); } lines.push(``); } function emitMethod(lines: string[], name: string, method: RpcMethod, isSession: boolean, resolveType: (name: string) => string, groupExperimental = false, groupDeprecated = false): void { const isInternal = method.visibility === "internal"; const methodName = (isInternal ? "_" : "") + toSnakeCase(name); const resultSchema = getMethodResultSchema(method); const nullableInner = resultSchema ? getNullableInner(resultSchema) : undefined; const effectiveResultSchema = nullableInner ?? resultSchema; const hasResult = !isVoidSchema(resultSchema) && !nullableInner; const hasNullableResult = !!nullableInner; const resultIsOpaque = isOpaqueJson(effectiveResultSchema); const resultIsObject = !resultIsOpaque && isPythonObjectResultSchema(effectiveResultSchema); let resultType: string; if (hasNullableResult) { const innerTypeName = resolveType(pythonResultTypeName(method, nullableInner)); resultType = `${innerTypeName} | None`; } else if (hasResult) { resultType = resolveType(pythonResultTypeName(method)); } else { resultType = "None"; } const effectiveParams = getMethodParamsSchema(method); const paramProps = effectiveParams?.properties || {}; const nonSessionParams = Object.keys(paramProps).filter((k) => k !== "sessionId"); const hasParams = isSession ? nonSessionParams.length > 0 : hasSchemaPayload(effectiveParams); const paramsType = resolveType(pythonParamsTypeName(method)); const paramsOptional = isParamsOptional(method); // Build signature with typed params + optional timeout const sig = hasParams ? paramsOptional ? ` async def ${methodName}(self, params: ${paramsType} | None = None, *, timeout: float | None = None) -> ${resultType}:` : ` async def ${methodName}(self, params: ${paramsType}, *, timeout: float | None = None) -> ${resultType}:` : ` async def ${methodName}(self, *, timeout: float | None = None) -> ${resultType}:`; lines.push(sig); pushPyRpcMethodDocstring(lines, " ", method, { paramsName: hasParams ? "params" : undefined, paramsDescription: rpcParamsDescription(method, effectiveParams), resultDescription: rpcResultDescription(method, resultSchema), deprecated: method.deprecated && !groupDeprecated, experimental: method.stability === "experimental" && !groupExperimental, internal: method.visibility === "internal", }); // Deserialize helper const innerTypeName = hasNullableResult ? resolveType(pythonResultTypeName(method, nullableInner)) : resultType; const isAnyType = innerTypeName === "Any"; const deserialize = (expr: string) => { if (resultIsOpaque || isAnyType) { return expr; } if (hasNullableResult) { return resultIsObject ? `${innerTypeName}.from_dict(${expr}) if ${expr} is not None else None` : `${innerTypeName}(${expr}) if ${expr} is not None else None`; } return resultIsObject ? `${innerTypeName}.from_dict(${expr})` : `${innerTypeName}(${expr})`; }; // Build request body with proper serialization/deserialization const emitRequestCall = (paramsExpr: string) => { const callExpr = `await self._client.request("${method.rpcMethod}", ${paramsExpr}, **_timeout_kwargs(timeout))`; if (hasResult || hasNullableResult) { if (hasNullableResult) { lines.push(` _result = ${callExpr}`); lines.push(` return ${deserialize("_result")}`); } else { lines.push(` return ${deserialize(callExpr)}`); } } else { lines.push(` ${callExpr}`); } }; if (isSession) { if (hasParams) { if (paramsOptional) { lines.push(` params_dict: dict[str, Any] = {k: v for k, v in params.to_dict().items() if v is not None} if params is not None else {}`); } else { lines.push(` params_dict: dict[str, Any] = {k: v for k, v in params.to_dict().items() if v is not None}`); } lines.push(` params_dict["sessionId"] = self._session_id`); emitRequestCall("params_dict"); } else { emitRequestCall(`{"sessionId": self._session_id}`); } } else { if (hasParams) { if (paramsOptional) { lines.push(` params_dict = {k: v for k, v in params.to_dict().items() if v is not None} if params is not None else {}`); } else { lines.push(` params_dict = {k: v for k, v in params.to_dict().items() if v is not None}`); } emitRequestCall("params_dict"); } else { emitRequestCall("{}"); } } lines.push(``); } function clientSessionHandlerMethodName(rpcMethod: string): string { const parts = rpcMethod.split("."); return toSnakeCase(parts[parts.length - 1]); } function emitClientSessionApiRegistration( lines: string[], node: Record, resolveType: (name: string) => string ): void { const groups = Object.entries(node).filter(([, value]) => typeof value === "object" && value !== null && !isRpcMethod(value)); for (const [groupName, groupNode] of groups) { const handlerName = `${toPascalCase(groupName)}Handler`; const groupExperimental = isNodeFullyExperimental(groupNode as Record); const groupDeprecated = isNodeFullyDeprecated(groupNode as Record); if (groupDeprecated) { lines.push(`# Deprecated: this API group is deprecated and will be removed in a future version.`); } if (groupExperimental) { pushPyExperimentalApiGroupComment(lines); } lines.push(`class ${handlerName}(Protocol):`); const methods = collectRpcMethods(groupNode as Record); for (const method of methods) { emitClientSessionHandlerMethod(lines, method, resolveType, groupExperimental, groupDeprecated); } lines.push(``); } lines.push(`@dataclass`); lines.push(`class ClientSessionApiHandlers:`); if (groups.length === 0) { lines.push(` pass`); } else { for (const [groupName] of groups) { lines.push(` ${toSnakeCase(groupName)}: ${toPascalCase(groupName)}Handler | None = None`); } } lines.push(``); lines.push(`def register_client_session_api_handlers(`); lines.push(` client: "JsonRpcClient",`); lines.push(` get_handlers: Callable[[str], ClientSessionApiHandlers],`); lines.push(`) -> None:`); lines.push(` """Register client-session request handlers on a JSON-RPC connection."""`); if (groups.length === 0) { lines.push(` return`); } else { for (const [groupName, groupNode] of groups) { const methods = collectRpcMethods(groupNode as Record); for (const method of methods) { emitClientSessionRegistrationMethod( lines, groupName, method, resolveType ); } } } lines.push(``); } function emitClientSessionHandlerMethod( lines: string[], method: RpcMethod, resolveType: (name: string) => string, groupExperimental = false, groupDeprecated = false ): void { const paramsType = resolveType(pythonParamsTypeName(method)); const resultSchema = getMethodResultSchema(method); const nullableInner = resultSchema ? getNullableInner(resultSchema) : undefined; let resultType: string; if (nullableInner) { resultType = `${resolveType(pythonResultTypeName(method, nullableInner))} | None`; } else if (!isVoidSchema(resultSchema)) { resultType = resolveType(pythonResultTypeName(method)); } else { resultType = "None"; } const methodName = clientSessionHandlerMethodName(method.rpcMethod); lines.push(` async def ${methodName}(self, params: ${paramsType}) -> ${resultType}:`); pushPyRpcMethodDocstring(lines, " ", method, { paramsName: "params", paramsDescription: rpcParamsDescription(method, getMethodParamsSchema(method)), resultDescription: rpcResultDescription(method, resultSchema), deprecated: method.deprecated && !groupDeprecated, experimental: method.stability === "experimental" && !groupExperimental, }); lines.push(` pass`); } function emitClientSessionRegistrationMethod( lines: string[], groupName: string, method: RpcMethod, resolveType: (name: string) => string ): void { const rpcSegments = method.rpcMethod.split("."); const handlerVariableName = `handle_${rpcSegments.map(toSnakeCase).join("_")}`; const paramsType = resolveType(pythonParamsTypeName(method)); const resultSchema = getMethodResultSchema(method); const nullableInner = resultSchema ? getNullableInner(resultSchema) : undefined; const hasResult = !isVoidSchema(resultSchema) && !nullableInner; const handlerField = toSnakeCase(groupName); const handlerMethod = clientSessionHandlerMethodName(method.rpcMethod); lines.push(` async def ${handlerVariableName}(params: dict) -> dict | None:`); lines.push(` request = ${paramsType}.from_dict(params)`); lines.push(` handler = get_handlers(request.session_id).${handlerField}`); lines.push( ` if handler is None: raise RuntimeError(f"No ${handlerField} handler registered for session: {request.session_id}")` ); if (hasResult) { lines.push(` result = await handler.${handlerMethod}(request)`); if (isObjectSchema(resultSchema)) { lines.push(` return result.to_dict()`); } else { lines.push(` return result.value if hasattr(result, 'value') else result`); } } else if (nullableInner) { lines.push(` result = await handler.${handlerMethod}(request)`); const resolvedInner = resolveSchema(nullableInner, rpcDefinitions) ?? nullableInner; if (isObjectSchema(resolvedInner) || nullableInner.$ref) { lines.push(` return result.to_dict() if result is not None else None`); } else { lines.push(` return result`); } } else { lines.push(` await handler.${handlerMethod}(request)`); lines.push(` return None`); } lines.push(` client.set_request_handler("${method.rpcMethod}", ${handlerVariableName})`); } // ── Main ──────────────────────────────────────────────────────────────────── async function generate(sessionSchemaPath?: string, apiSchemaPath?: string): Promise { await generateSessionEvents(sessionSchemaPath); try { const resolvedSessionPath = sessionSchemaPath ?? (await getSessionEventsSchemaPath()); const sessionSchema = postProcessSchema(cloneSchemaForCodegen((await loadSchemaJson(resolvedSessionPath)) as JSONSchema7)); await generateRpc(apiSchemaPath, sessionSchema); } catch (err) { if ((err as NodeJS.ErrnoException).code === "ENOENT" && !apiSchemaPath) { console.log("Python: skipping RPC (api.schema.json not found)"); } else { throw err; } } } const __filename = fileURLToPath(import.meta.url); if (process.argv[1] && path.resolve(process.argv[1]) === __filename) { const sessionArg = process.argv[2] || undefined; const apiArg = process.argv[3] || undefined; generate(sessionArg, apiArg).catch((err) => { console.error("Python generation failed:", err); process.exit(1); }); }