Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Rework FOR_ITER specialization in the free-threaded build to only app…
…ly to

uniquely referenced iterators. This handles the common case of 'for item in
seq' (where 'seq' is a list, tuple or range object) and 'for item in
generator_function()', but not, for example, 'g = gen(...); for item in g:'.
  • Loading branch information
Yhg1s committed Jan 21, 2025
commit cf5fcd01ec39b5f0e8e57ba5ea24cc70eac5ef62
2 changes: 1 addition & 1 deletion Include/internal/pycore_list.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ extern void _PyList_DebugMallocStats(FILE *out);
extern PyObject* _PyList_GetItemRef(PyListObject *, Py_ssize_t i);
#ifdef Py_GIL_DISABLED
// Returns -1 in case of races with other threads.
extern int _PyList_GetItemRefNoLock(PyListObject *, Py_ssize_t i, PyObject ** result);
extern int _PyList_GetItemRefNoLock(PyListObject *, Py_ssize_t, PyObject **);
Comment thread
Yhg1s marked this conversation as resolved.
Outdated
#endif

#define _PyList_ITEMS(op) _Py_RVALUE(_PyList_CAST(op)->ob_item)
Expand Down
2 changes: 1 addition & 1 deletion Include/internal/pycore_uop_metadata.h

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

155 changes: 155 additions & 0 deletions Lib/test/test_free_threading/test_iteration_deopt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
import dis
import queue
import threading
import time
import unittest
from test.support import (import_helper, cpython_only, Py_GIL_DISABLED,
requires_specialization_ft)

_testinternalcapi = import_helper.import_module("_testinternalcapi")
_testlimitedcapi = import_helper.import_module("_testlimitedcapi")

NUMTHREADS = 5

def get_tlbc_instructions(f):
co = dis._get_code_object(f)
tlbc = _testinternalcapi.get_tlbc(co)
return [i.opname for i in dis._get_instructions_bytes(tlbc)]


class IterationDeoptTests(unittest.TestCase):
def check_deopt(self, get_iter, opcode, is_generator=False):
input = range(100)
expected_len = len(input)
q = queue.Queue()
barrier = threading.Barrier(NUMTHREADS + 1)
done = threading.Event()
def worker():
# A complicated dance to get a weak reference to an iterator
# _only_ (strongly) referenced by the for loop, so that we can
# force our loop to deopt mid-way through.
it = get_iter(input)
ref = _testlimitedcapi.pylong_fromvoidptr(it)
Comment thread
Yhg1s marked this conversation as resolved.
Outdated
q.put(ref)
# We can't use enumerate without affecting the loop, so keep a
# manual counter.
i = 0
loop_a_little_more = 5
results = []
try:
# Make sure we're not still specialized from a previous run.
ops = get_tlbc_instructions(worker)
self.assertIn('FOR_ITER', ops)
self.assertNotIn(opcode, ops)
for item in it:
results.append(item)
i += 1

# We have to be very careful exiting the loop, because
# if the main thread hasn't dereferenced the unsafe
# weakref to our iterator yet, exiting will make it
# invalid and cause a crash. Getting the timing right is
# difficult, though, since it depends on the OS
# scheduler and the system load. As a final safeguard,
# if we're close to finishing the loop, just wait for the
# main thread.
if i + loop_a_little_more > expected_len:
done.wait()

if i == 1:
del it
# Warm up. The first iteration didn't count because of
# the extra reference to the iterator.
if i < 10:
continue
if i == 10:
ops = get_tlbc_instructions(worker)
self.assertIn(opcode, ops)
# Let the main thread know it's time to reference our iterator.
barrier.wait()
continue
# Continue iterating while at any time our loop may be
# forced to deopt, but try to get the thread scheduler
# to give the main thread a chance to run.
if not done.is_set():
time.sleep(0)
continue
if loop_a_little_more:
# Loop a little more after 'done' is set to make sure we
# introduce a tsan-detectable race if the loop isn't
# deopting appropriately.
loop_a_little_more -= 1
continue
break
self.assertEqual(results, list(input)[:i])
except threading.BrokenBarrierError:
return
except Exception as e:
# In case the exception happened before the last barrier,
# reset it so nothing is left hanging.
barrier.reset()
# In case it's the final assertion that failed, just add it
# to the result queue so it'll show up in the normal test
# output.
q.put(e)
raise
q.put("SUCCESS")
# Reset specialization and thread-local bytecode from previous runs.
worker.__code__ = worker.__code__.replace()
threads = [threading.Thread(target=worker) for i in range(NUMTHREADS)]
for t in threads:
t.start()
# Get the "weakrefs" from the worker threads.
refs = [q.get() for i in range(NUMTHREADS)]
# Wait for each thread to finish its specialization check.
barrier.wait()
# Dereference the "weakrefs" we were sent in an extremely unsafe way.
iterators = [_testlimitedcapi.pylong_asvoidptr(ref) for ref in refs]
done.set()
self.assertNotIn(None, iterators)
# Read data that the iteration writes, to trigger data races if they
# don't deopt appropriately.
if is_generator:
for it in iterators:
it.gi_running
else:
for it in iterators:
it.__reduce__()
for t in threads:
t.join()
results = [q.get() for i in range(NUMTHREADS)]
self.assertEqual(results, ["SUCCESS"] * NUMTHREADS)

@cpython_only
@requires_specialization_ft
@unittest.skipIf(not Py_GIL_DISABLED, "requires free-threading")
def test_deopt_leaking_iterator_list(self):
def make_list_iter(input):
return iter(list(input))
self.check_deopt(make_list_iter, 'FOR_ITER_LIST')

@cpython_only
@requires_specialization_ft
@unittest.skipIf(not Py_GIL_DISABLED, "requires free-threading")
def test_deopt_leaking_iterator_tuple(self):
def make_tuple_iter(input):
return iter(tuple(input))
self.check_deopt(make_tuple_iter, 'FOR_ITER_TUPLE')

@cpython_only
@requires_specialization_ft
@unittest.skipIf(not Py_GIL_DISABLED, "requires free-threading")
def test_deopt_leaking_iterator_range(self):
def make_range_iter(input):
return iter(input)
self.check_deopt(make_range_iter, 'FOR_ITER_RANGE')

@cpython_only
@requires_specialization_ft
@unittest.skipIf(not Py_GIL_DISABLED, "requires free-threading")
def test_deopt_leaking_iterator_generator(self):
def gen(input):
for item in input:
yield item
self.check_deopt(gen, 'FOR_ITER_GEN', is_generator=True)

38 changes: 38 additions & 0 deletions Lib/test/test_opcache.py
Original file line number Diff line number Diff line change
Expand Up @@ -1631,6 +1631,44 @@ def compare_op_str():
self.assert_specialized(compare_op_str, "COMPARE_OP_STR")
self.assert_no_opcode(compare_op_str, "COMPARE_OP")

@cpython_only
@requires_specialization_ft
def test_for_iter(self):
L = list(range(10))
def for_iter_list():
for i in L:
self.assertIn(i, L)

for_iter_list()
self.assert_specialized(for_iter_list, "FOR_ITER_LIST")
self.assert_no_opcode(for_iter_list, "FOR_ITER")

t = tuple(range(10))
def for_iter_tuple():
for i in t:
self.assertIn(i, t)

for_iter_tuple()
self.assert_specialized(for_iter_tuple, "FOR_ITER_TUPLE")
self.assert_no_opcode(for_iter_tuple, "FOR_ITER")

r = range(10)
def for_iter_range():
for i in r:
self.assertIn(i, r)

for_iter_range()
self.assert_specialized(for_iter_range, "FOR_ITER_RANGE")
self.assert_no_opcode(for_iter_range, "FOR_ITER")

def for_iter_generator():
for i in (i for i in range(10)):
i + 1

for_iter_generator()
self.assert_specialized(for_iter_generator, "FOR_ITER_GEN")
self.assert_no_opcode(for_iter_generator, "FOR_ITER")


if __name__ == "__main__":
unittest.main()
28 changes: 11 additions & 17 deletions Objects/rangeobject.c
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
#include "pycore_modsupport.h" // _PyArg_NoKwnames()
#include "pycore_range.h"
#include "pycore_tuple.h" // _PyTuple_ITEMS()
#include "pycore_pyatomic_ft_wrappers.h"


/* Support objects whose length is > PY_SSIZE_T_MAX.
Expand Down Expand Up @@ -817,12 +816,10 @@ PyTypeObject PyRange_Type = {
static PyObject *
rangeiter_next(_PyRangeIterObject *r)
{
long len = FT_ATOMIC_LOAD_LONG_RELAXED(r->len);
if (len > 0) {
long result = FT_ATOMIC_LOAD_LONG_RELAXED(r->start);
FT_ATOMIC_STORE_LONG_RELAXED(r->start, result + r->step);
// Relaxed ops for maximum speed and minimum thread-safety.
FT_ATOMIC_STORE_LONG_RELAXED(r->len, len - 1);
if (r->len > 0) {
long result = r->start;
r->start = result + r->step;
r->len--;
return PyLong_FromLong(result);
}
return NULL;
Expand All @@ -831,7 +828,7 @@ rangeiter_next(_PyRangeIterObject *r)
static PyObject *
rangeiter_len(_PyRangeIterObject *r, PyObject *Py_UNUSED(ignored))
{
return PyLong_FromLong(FT_ATOMIC_LOAD_LONG_RELAXED(r->len));
return PyLong_FromLong(r->len);
}

PyDoc_STRVAR(length_hint_doc,
Expand All @@ -844,11 +841,10 @@ rangeiter_reduce(_PyRangeIterObject *r, PyObject *Py_UNUSED(ignored))
PyObject *range;

/* create a range object for pickling */
long lstart = FT_ATOMIC_LOAD_LONG_RELAXED(r->start);
start = PyLong_FromLong(lstart);
start = PyLong_FromLong(r->start);
if (start == NULL)
goto err;
stop = PyLong_FromLong(lstart + FT_ATOMIC_LOAD_LONG_RELAXED(r->len) * r->step);
stop = PyLong_FromLong(r->start + r->len * r->step);
if (stop == NULL)
goto err;
step = PyLong_FromLong(r->step);
Expand All @@ -875,14 +871,12 @@ rangeiter_setstate(_PyRangeIterObject *r, PyObject *state)
if (index == -1 && PyErr_Occurred())
return NULL;
/* silently clip the index value */
long len = FT_ATOMIC_LOAD_LONG_RELAXED(r->len);
if (index < 0)
index = 0;
else if (index > len)
index = len; /* exhausted iterator */
FT_ATOMIC_STORE_LONG_RELAXED(r->start,
FT_ATOMIC_LOAD_LONG_RELAXED(r->start) + index * r->step);
FT_ATOMIC_STORE_LONG_RELAXED(r->len, len - index);
else if (index > r->len)
index = r->len; /* exhausted iterator */
r->start += index * r->step;
r->len -= index;
Py_RETURN_NONE;
}

Expand Down
10 changes: 4 additions & 6 deletions Objects/tupleobject.c
Original file line number Diff line number Diff line change
Expand Up @@ -1020,15 +1020,13 @@ tupleiter_next(PyObject *self)
return NULL;
assert(PyTuple_Check(seq));

Py_ssize_t idx = FT_ATOMIC_LOAD_SSIZE_RELAXED(it->it_index);
if ((size_t)idx < (size_t)PyTuple_GET_SIZE(seq)) {
item = PyTuple_GET_ITEM(seq, idx++);
FT_ATOMIC_STORE_SSIZE_RELAXED(it->it_index, idx);
if (it->it_index < PyTuple_GET_SIZE(seq)) {
item = PyTuple_GET_ITEM(seq, it->it_index);
++it->it_index;
return Py_NewRef(item);
}

FT_ATOMIC_STORE_SSIZE_RELAXED(it->it_index, -1);
FT_ATOMIC_STORE_PTR_RELAXED(it->it_seq, NULL);
it->it_seq = NULL;
Py_DECREF(seq);
return NULL;
}
Expand Down
Loading