Skip to content

feat(bigframes): UDF transpiler handles some control flow#17558

Draft
TrevorBergeron wants to merge 7 commits into
mainfrom
tbergeron_more_py_funcs
Draft

feat(bigframes): UDF transpiler handles some control flow#17558
TrevorBergeron wants to merge 7 commits into
mainfrom
tbergeron_more_py_funcs

Conversation

@TrevorBergeron

Copy link
Copy Markdown
Contributor

Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly:

  • Make sure to open an issue as a bug/issue before writing your code! That way we can discuss the change, evaluate designs, and agree on the general idea
  • Ensure the tests and linter pass
  • Code coverage does not decrease (if any source code was changed)
  • Appropriate docs were updated (if necessary)

Fixes #<issue_number_goes_here> 🦕

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces control flow graph (CFG) construction and topological sorting to transpile Python bytecode with conditional branches and jumps into BigFrames expressions, supported by extensive unit tests. The review feedback highlights several critical improvements, including handling the JUMP_BACKWARD_NO_INTERRUPT instruction to prevent loops from being silently compiled, optimizing CFG building by precomputing instruction offsets to avoid O(N^2) sorting overhead, removing dead code for STORE_FAST, and correcting a misleading stack underflow error message.

Comment on lines +90 to +117
def get_block_successors(
block: BasicBlock, instructions_by_offset: dict[int, dis.Instruction]
) -> list[int]:
if not block.instructions:
return []
last_inst = block.instructions[-1]
opname = last_inst.opname
offset = last_inst.offset

sorted_offsets = sorted(instructions_by_offset.keys())
idx = sorted_offsets.index(offset)
next_offset = sorted_offsets[idx + 1] if idx + 1 < len(sorted_offsets) else None

if opname in ("RETURN_VALUE", "RETURN_CONST"):
return []

if opname in ("JUMP_FORWARD", "JUMP_ABSOLUTE", "JUMP_BACKWARD"):
return [last_inst.argval]

if "JUMP" in opname and ("IF" in opname or "OR_POP" in opname):
successors = [last_inst.argval]
if next_offset is not None:
successors.append(next_offset)
return successors

if next_offset is not None:
return [next_offset]
return []

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The unconditional jump instruction JUMP_BACKWARD_NO_INTERRUPT (introduced in Python 3.11) is not handled in get_block_successors. It falls through to the default next_offset handling, which is incorrect because it is an unconditional jump. This will cause the CFG to omit the backward edge, preventing topological_sort from detecting loops and leading to silent compilation of loops into incorrect linear code or runtime errors.

Additionally, repeatedly sorting and calling .index() on instructions_by_offset inside get_block_successors is inefficient ($O(N^2)$). We can optimize this by precomputing next_offsets once and passing it.

Note: Since this modifies code generation logic, please manually verify the diffs of the generated output against established golden files to ensure that changes are deterministic and do not introduce unintended regressions.

def get_block_successors(
    block: BasicBlock, next_offsets: dict[int, int]
) -> list[int]:
    if not block.instructions:
        return []
    last_inst = block.instructions[-1]
    opname = last_inst.opname
    offset = last_inst.offset

    next_offset = next_offsets.get(offset)

    if opname in ("RETURN_VALUE", "RETURN_CONST"):
        return []

    if opname in ("JUMP_FORWARD", "JUMP_ABSOLUTE", "JUMP_BACKWARD", "JUMP_BACKWARD_NO_INTERRUPT"):
        return [last_inst.argval]

    if "JUMP" in opname and ("IF" in opname or "OR_POP" in opname):
        successors = [last_inst.argval]
        if next_offset is not None:
            successors.append(next_offset)
        return successors

    if next_offset is not None:
        return [next_offset]
    return []
References
  1. When modifying code generation logic, manually verify the diffs of the generated output against established golden files to ensure that changes are deterministic and do not introduce unintended regressions.

Comment on lines +120 to +140
def build_cfg(instructions: list[dis.Instruction]) -> dict[int, BasicBlock]:
starts = sorted(list(get_block_starts(instructions)))
instructions_by_offset = {inst.offset: inst for inst in instructions}

blocks: dict[int, BasicBlock] = {}
for i, start in enumerate(starts):
end = starts[i + 1] if i + 1 < len(starts) else None
block_insts = [
inst
for inst in instructions
if start <= inst.offset and (end is None or inst.offset < end)
]
blocks[start] = BasicBlock(start_offset=start, instructions=block_insts)

for block in blocks.values():
successors = get_block_successors(block, instructions_by_offset)
block.successors = successors
for succ in successors:
blocks[succ].predecessors.append(block.start_offset)

return blocks

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Optimize build_cfg by precomputing next_offsets in $O(N)$ time and passing it to get_block_successors to avoid redundant sorting and lookup overhead.

Note: Since this modifies code generation logic, please manually verify the diffs of the generated output against established golden files to ensure that changes are deterministic and do not introduce unintended regressions.

def build_cfg(instructions: list[dis.Instruction]) -> dict[int, BasicBlock]:
    starts = sorted(list(get_block_starts(instructions)))
    next_offsets = {inst.offset: next_inst.offset for inst, next_inst in zip(instructions, instructions[1:])}

    blocks: dict[int, BasicBlock] = {}
    for i, start in enumerate(starts):
        end = starts[i + 1] if i + 1 < len(starts) else None
        block_insts = [
            inst
            for inst in instructions
            if start <= inst.offset and (end is None or inst.offset < end)
        ]
        blocks[start] = BasicBlock(start_offset=start, instructions=block_insts)

    for block in blocks.values():
        successors = get_block_successors(block, next_offsets)
        block.successors = successors
        for succ in successors:
            blocks[succ].predecessors.append(block.start_offset)

    return blocks
References
  1. When modifying code generation logic, manually verify the diffs of the generated output against established golden files to ensure that changes are deterministic and do not introduce unintended regressions.

Comment on lines 181 to +185
def _compile_bytecode_to_py_expr(func: Callable) -> expression.Expression:
instructions = list(dis.get_instructions(func))

stack: list[expression.Expression] = []
blocks = build_cfg(instructions)
order = topological_sort(blocks)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Precompute next_offsets once at the beginning of _compile_bytecode_to_py_expr so it can be reused across the function to avoid rebuilding and sorting instructions_by_offset inside loops.

Note: Since this modifies code generation logic, please manually verify the diffs of the generated output against established golden files to ensure that changes are deterministic and do not introduce unintended regressions.

Suggested change
def _compile_bytecode_to_py_expr(func: Callable) -> expression.Expression:
instructions = list(dis.get_instructions(func))
stack: list[expression.Expression] = []
blocks = build_cfg(instructions)
order = topological_sort(blocks)
def _compile_bytecode_to_py_expr(func: Callable) -> expression.Expression:
instructions = list(dis.get_instructions(func))
next_offsets = {inst.offset: next_inst.offset for inst, next_inst in zip(instructions, instructions[1:])}
blocks = build_cfg(instructions)
order = topological_sort(blocks)
References
  1. When modifying code generation logic, manually verify the diffs of the generated output against established golden files to ensure that changes are deterministic and do not introduce unintended regressions.

Comment on lines +532 to +539
instructions_by_offset = {i.offset: i for i in instructions}
sorted_offsets = sorted(instructions_by_offset.keys())
idx = sorted_offsets.index(inst.offset)
next_offset = (
sorted_offsets[idx + 1]
if idx + 1 < len(sorted_offsets)
else None
)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Avoid rebuilding and sorting instructions_by_offset inside the instruction loop by using the precomputed next_offsets dictionary.

Note: Since this modifies code generation logic, please manually verify the diffs of the generated output against established golden files to ensure that changes are deterministic and do not introduce unintended regressions.

Suggested change
instructions_by_offset = {i.offset: i for i in instructions}
sorted_offsets = sorted(instructions_by_offset.keys())
idx = sorted_offsets.index(inst.offset)
next_offset = (
sorted_offsets[idx + 1]
if idx + 1 < len(sorted_offsets)
else None
)
next_offset = next_offsets.get(inst.offset)
References
  1. When modifying code generation logic, manually verify the diffs of the generated output against established golden files to ensure that changes are deterministic and do not introduce unintended regressions.

Comment on lines +584 to +591
instructions_by_offset = {i.offset: i for i in instructions}
sorted_offsets = sorted(instructions_by_offset.keys())
idx = sorted_offsets.index(block.instructions[-1].offset)
next_offset = (
sorted_offsets[idx + 1] if idx + 1 < len(sorted_offsets) else None
)
if next_offset is not None:
edge_conditions[(offset, next_offset)] = reach_cond

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Avoid rebuilding and sorting instructions_by_offset at the end of the block loop by using the precomputed next_offsets dictionary.

Note: Since this modifies code generation logic, please manually verify the diffs of the generated output against established golden files to ensure that changes are deterministic and do not introduce unintended regressions.

Suggested change
instructions_by_offset = {i.offset: i for i in instructions}
sorted_offsets = sorted(instructions_by_offset.keys())
idx = sorted_offsets.index(block.instructions[-1].offset)
next_offset = (
sorted_offsets[idx + 1] if idx + 1 < len(sorted_offsets) else None
)
if next_offset is not None:
edge_conditions[(offset, next_offset)] = reach_cond
next_offset = next_offsets.get(block.instructions[-1].offset)
if next_offset is not None:
edge_conditions[(offset, next_offset)] = reach_cond
References
  1. When modifying code generation logic, manually verify the diffs of the generated output against established golden files to ensure that changes are deterministic and do not introduce unintended regressions.

Comment on lines +517 to +519
elif opname in ("STORE_FAST", "POP_TOP"):
if stack:
stack.pop()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

STORE_FAST is already fully handled on line 308. Because of the if/elif structure, the block on line 517 is dead code for STORE_FAST. We should simplify this block to only handle POP_TOP.

Note: Since this modifies code generation logic, please manually verify the diffs of the generated output against established golden files to ensure that changes are deterministic and do not introduce unintended regressions.

Suggested change
elif opname in ("STORE_FAST", "POP_TOP"):
if stack:
stack.pop()
elif opname == "POP_TOP":
if stack:
stack.pop()
References
  1. When modifying code generation logic, manually verify the diffs of the generated output against established golden files to ensure that changes are deterministic and do not introduce unintended regressions.

Comment on lines +486 to +487
if len(stack) < num_args:
raise ValueError("Stack has < 2 elements")

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The error message for stack underflow during a function call is hardcoded to "Stack has < 2 elements". This is misleading because the function call might expect a different number of arguments (num_args). Let's make it dynamic.

Note: Since this modifies code generation logic, please manually verify the diffs of the generated output against established golden files to ensure that changes are deterministic and do not introduce unintended regressions.

Suggested change
if len(stack) < num_args:
raise ValueError("Stack has < 2 elements")
if len(stack) < num_args:
raise ValueError(f"Stack has fewer than {num_args} elements")
References
  1. When modifying code generation logic, manually verify the diffs of the generated output against established golden files to ensure that changes are deterministic and do not introduce unintended regressions.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant