forked from MatthieuDartiailh/bytecode
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathflags.py
More file actions
230 lines (200 loc) · 7.4 KB
/
flags.py
File metadata and controls
230 lines (200 loc) · 7.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
import opcode as _opcode
from enum import IntFlag
from typing import Optional
# alias to keep the 'bytecode' variable free
import bytecode as _bytecode
from .instr import DUAL_ARG_OPCODES, RESUME_OPCODE, CellVar, FreeVar
from .utils import PY312, PY313, PY314
class CompilerFlags(IntFlag):
"""Possible values of the co_flags attribute of Code object.
Note: We do not rely on inspect values here as some of them are missing and
furthermore would be version dependent.
"""
OPTIMIZED = 0x00001
NEWLOCALS = 0x00002
VARARGS = 0x00004
VARKEYWORDS = 0x00008
NESTED = 0x00010
GENERATOR = 0x00020
NOFREE = 0x00040
# New in Python 3.5
# Used for coroutines defined using async def ie native coroutine
COROUTINE = 0x00080
# Used for coroutines defined as a generator and then decorated using
# types.coroutine
ITERABLE_COROUTINE = 0x00100
# New in Python 3.6
# Generator defined in an async def function
ASYNC_GENERATOR = 0x00200
FUTURE_GENERATOR_STOP = 0x800000
FUTURE_ANNOTATIONS = 0x1000000
UNOPTIMIZED_OPCODES = (
_opcode.opmap["STORE_NAME"],
_opcode.opmap["LOAD_NAME"],
_opcode.opmap["DELETE_NAME"],
)
ASYNC_OPCODES = (
_opcode.opmap["GET_AWAITABLE"],
_opcode.opmap["GET_AITER"],
_opcode.opmap["GET_ANEXT"],
*((_opcode.opmap["BEFORE_ASYNC_WITH"],) if not PY314 else ()), # Removed in 3.14+
_opcode.opmap["END_ASYNC_FOR"],
*((_opcode.opmap["ASYNC_GEN_WRAP"],) if not PY312 else ()), # New in 3.11
)
YIELD_VALUE_OPCODE = _opcode.opmap["YIELD_VALUE"]
GENERATOR_LIKE_OPCODES = (
_opcode.opmap["RETURN_GENERATOR"], # Added in 3.11+
)
def infer_flags(
bytecode: "_bytecode.Bytecode |_bytecode.ConcreteBytecode |_bytecode.ControlFlowGraph",
is_async: bool | None = None,
):
"""Infer the proper flags for a bytecode based on the instructions.
Because the bytecode does not have enough context to guess if a function
is asynchronous the algorithm tries to be conservative and will never turn
a previously async code into a sync one.
Parameters
----------
bytecode : Bytecode | ConcreteBytecode | ControlFlowGraph
Bytecode for which to infer the proper flags
is_async : bool | None, optional
Force the code to be marked as asynchronous if True, prevent it from
being marked as asynchronous if False and simply infer the best
solution based on the opcode and the existing flag if None.
"""
flags = CompilerFlags(0)
if not isinstance(
bytecode,
(_bytecode.Bytecode, _bytecode.ConcreteBytecode, _bytecode.ControlFlowGraph),
):
msg = (
"Expected a Bytecode, ConcreteBytecode or ControlFlowGraph instance not %s"
)
raise ValueError(msg % bytecode)
instructions = (
bytecode._get_instructions()
if isinstance(bytecode, _bytecode.ControlFlowGraph)
else bytecode
)
# Iterate over the instructions and inspect the arguments
is_concrete = isinstance(bytecode, _bytecode.ConcreteBytecode)
optimized = True
has_free = False if not is_concrete else bytecode.cellvars and bytecode.freevars
known_async = False
known_generator = False
possible_generator = False
instr_iter = iter(instructions)
for instr in instr_iter:
if isinstance(
instr,
(
_bytecode.SetLineno,
_bytecode.Label,
_bytecode.TryBegin,
_bytecode.TryEnd,
),
):
continue
opcode = instr.opcode
if opcode in UNOPTIMIZED_OPCODES:
optimized = False
elif opcode in ASYNC_OPCODES:
known_async = True
elif opcode == YIELD_VALUE_OPCODE:
while isinstance(
ni := next(instr_iter),
(
_bytecode.SetLineno,
_bytecode.Label,
_bytecode.TryBegin,
_bytecode.TryEnd,
),
):
pass
assert ni._opcode == RESUME_OPCODE
if (ni.arg & 3) != 3:
known_generator = True
else:
known_async = True
elif opcode in GENERATOR_LIKE_OPCODES:
possible_generator = True
elif opcode in _opcode.hasfree:
has_free = True
elif (
not is_concrete
and opcode in DUAL_ARG_OPCODES
and (isinstance(instr.arg[0], CellVar) or isinstance(instr.arg[1], CellVar))
):
has_free = True
elif (
PY313
and opcode in _opcode.haslocal
and isinstance(instr.arg, (CellVar, FreeVar))
):
has_free = True
# Identify optimized code
if optimized:
flags |= CompilerFlags.OPTIMIZED
# Check for free variables
if not has_free:
flags |= CompilerFlags.NOFREE
# Copy flags for which we cannot infer the right value
flags |= bytecode.flags & (
CompilerFlags.NEWLOCALS
| CompilerFlags.VARARGS
| CompilerFlags.VARKEYWORDS
| CompilerFlags.NESTED
)
# If performing inference or forcing an async behavior, first inspect
# the flags since this is the only way to identify iterable coroutines
if is_async in (None, True):
if (
bytecode.flags & CompilerFlags.COROUTINE
or bytecode.flags & CompilerFlags.ASYNC_GENERATOR
):
if known_generator:
flags |= CompilerFlags.ASYNC_GENERATOR
else:
flags |= CompilerFlags.COROUTINE
elif bytecode.flags & CompilerFlags.ITERABLE_COROUTINE:
if known_async:
msg = (
"The ITERABLE_COROUTINE flag is set but bytecode that"
"can only be used in async functions have been "
"detected. Please unset that flag before performing "
"inference."
)
raise ValueError(msg)
flags |= CompilerFlags.ITERABLE_COROUTINE
# If the code was not asynchronous before determine if it should now be
# asynchronous based on the opcode and the is_async argument.
else:
if known_async:
# YIELD_FROM is not allowed in async generator
if known_generator:
flags |= CompilerFlags.ASYNC_GENERATOR
else:
flags |= CompilerFlags.COROUTINE
elif known_generator or possible_generator:
if is_async:
if known_generator:
flags |= CompilerFlags.ASYNC_GENERATOR
else:
flags |= CompilerFlags.COROUTINE
else:
flags |= CompilerFlags.GENERATOR
elif is_async:
flags |= CompilerFlags.COROUTINE
# If the code should not be asynchronous, check first it is possible and
# next set the GENERATOR flag if relevant
else:
if known_async:
raise ValueError(
"The is_async argument is False but bytecodes "
"that can only be used in async functions have "
"been detected."
)
if known_generator or possible_generator:
flags |= CompilerFlags.GENERATOR
flags |= bytecode.flags & CompilerFlags.FUTURE_GENERATOR_STOP
return flags