Skip to content
Merged
Prev Previous commit
Next Next commit
refactor code based on review
  • Loading branch information
eendebakpt committed Feb 3, 2026
commit dbb33bdf3a6889d6ecac5936d85ca8327df889a7
146 changes: 29 additions & 117 deletions Lib/test/test_free_threading/test_itertools.py
Original file line number Diff line number Diff line change
@@ -1,147 +1,59 @@
import unittest
from threading import Thread, Barrier
from itertools import batched, chain, combinations_with_replacement, cycle, permutations
from test.support import threading_helper


threading_helper.requires_working_threading(module=True)

class ItertoolsThreading(unittest.TestCase):

@threading_helper.reap_threads
def test_batched(self):
number_of_threads = 10
number_of_iterations = 20
barrier = Barrier(number_of_threads)
def work(it):
barrier.wait()
while True:
try:
next(it)
except StopIteration:
break

data = tuple(range(1000))
for it in range(number_of_iterations):
batch_iterator = batched(data, 2)
worker_threads = []
for ii in range(number_of_threads):
worker_threads.append(
Thread(target=work, args=[batch_iterator]))
def work_iterator(it):
while True:
try:
next(it)
except StopIteration:
break

with threading_helper.start_threads(worker_threads):
pass

barrier.reset()
class ItertoolsThreading(unittest.TestCase):

@threading_helper.reap_threads
def test_cycle(self):
number_of_threads = 6
def test_batched(self):
number_of_iterations = 10
number_of_cycles = 400
for _ in range(number_of_iterations):
it = batched(tuple(range(1000)), 2)
threading_helper.run_concurrently(work_iterator, nthreads=10, args=[it])

barrier = Barrier(number_of_threads)
@threading_helper.reap_threads
def test_cycle(self):
def work(it):
barrier.wait()
for _ in range(number_of_cycles):
try:
next(it)
except StopIteration:
pass

data = (1, 2, 3, 4)
for it in range(number_of_iterations):
cycle_iterator = cycle(data)
worker_threads = []
for ii in range(number_of_threads):
worker_threads.append(
Thread(target=work, args=[cycle_iterator]))
for _ in range(400):
next(it)

with threading_helper.start_threads(worker_threads):
pass

barrier.reset()
number_of_iterations = 6
for _ in range(number_of_iterations):
it = cycle((1, 2, 3, 4))
threading_helper.run_concurrently(work, nthreads=6, args=[it])

@threading_helper.reap_threads
def test_chain(self):
number_of_threads = 6
number_of_iterations = 20

barrier = Barrier(number_of_threads)
def work(it):
barrier.wait()
while True:
try:
next(it)
except StopIteration:
break

data = [(1, )] * 200
for it in range(number_of_iterations):
chain_iterator = chain(*data)
worker_threads = []
for ii in range(number_of_threads):
worker_threads.append(
Thread(target=work, args=[chain_iterator]))

with threading_helper.start_threads(worker_threads):
pass

barrier.reset()
number_of_iterations = 10
for _ in range(number_of_iterations):
it = chain(*[(1,)] * 200)
threading_helper.run_concurrently(work_iterator, nthreads=6, args=[it])

@threading_helper.reap_threads
def test_combinations_with_replacement(self):
number_of_threads = 6
number_of_iterations = 36
data = tuple(range(2))

barrier = Barrier(number_of_threads)
def work(it):
barrier.wait()
while True:
try:
next(it)
except StopIteration:
break

number_of_iterations = 6
for _ in range(number_of_iterations):
cwr_iterator = combinations_with_replacement(data, 2)
worker_threads = []
for _ in range(number_of_threads):
worker_threads.append(
Thread(target=work, args=[cwr_iterator]))

with threading_helper.start_threads(worker_threads):
pass

barrier.reset()
it = combinations_with_replacement(tuple(range(2)), 2)
threading_helper.run_concurrently(work_iterator, nthreads=6, args=[it])

@threading_helper.reap_threads
def test_permutations(self):
number_of_threads = 6
number_of_iterations = 36
data = tuple(range(4))

barrier = Barrier(number_of_threads)
def work(it):
barrier.wait()
while True:
try:
next(it)
except StopIteration:
break

number_of_iterations = 6
for _ in range(number_of_iterations):
perm_iterator = permutations(data, 2)
worker_threads = []
for _ in range(number_of_threads):
worker_threads.append(
Thread(target=work, args=[perm_iterator]))

with threading_helper.start_threads(worker_threads):
pass

barrier.reset()
it = permutations(tuple(range(4)), 2)
threading_helper.run_concurrently(work_iterator, nthreads=6, args=[it])


if __name__ == "__main__":
Expand Down
Loading