Skip to content

Commit 9f1c3b1

Browse files
committed
Better handling of blocked streams.
1 parent 9062061 commit 9f1c3b1

File tree

3 files changed

+168
-18
lines changed

3 files changed

+168
-18
lines changed

src/priority/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,4 @@
22
"""
33
priority: HTTP/2 priority implementation for Python
44
"""
5-
from .priority import Stream, PriorityTree # noqa
5+
from .priority import Stream, PriorityTree, DeadlockError # noqa

src/priority/priority.py

Lines changed: 42 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,14 @@
1616
import queue
1717

1818

19+
class DeadlockError(Exception):
20+
"""
21+
Raised when there are no streams that can make progress: all streams are
22+
blocked.
23+
"""
24+
pass
25+
26+
1927
class Stream(object):
2028
"""
2129
Priority information for a given stream.
@@ -82,18 +90,34 @@ def schedule(self):
8290
# Cannot be called on active streams.
8391
assert not self.active
8492

85-
level, child = self.child_queue.get()
86-
87-
if child.active:
88-
next_stream = child.stream_id
89-
else:
90-
next_stream = child.schedule()
91-
92-
self.last_weight = level
93-
94-
level += (256 + child._defecit) // child.weight
95-
child._defecit = (256 + child._defecit) % child.weight
96-
self.child_queue.put((level, child))
93+
next_stream = None
94+
popped_streams = []
95+
96+
# Spin looking for the next active stream. Everything we pop off has
97+
# to be rescheduled, even if it turns out none of them were active at
98+
# this time.
99+
try:
100+
while next_stream is None:
101+
# If the queue is empty, immediately fail.
102+
val = self.child_queue.get(block=False)
103+
popped_streams.append(val)
104+
level, child = val
105+
106+
if child.active:
107+
next_stream = child.stream_id
108+
else:
109+
# Guard against the possibility that the child also has no
110+
# suitable children.
111+
try:
112+
next_stream = child.schedule()
113+
except queue.Empty:
114+
continue
115+
finally:
116+
for level, child in popped_streams:
117+
self.last_weight = level
118+
level += (256 + child._defecit) // child.weight
119+
child._defecit = (256 + child._defecit) % child.weight
120+
self.child_queue.put((level, child))
97121

98122
return next_stream
99123

@@ -179,6 +203,7 @@ def insert_stream(self,
179203
assert depends_on is not None
180204
parent_stream = self._streams[depends_on]
181205
self._exclusive_insert(parent_stream, stream)
206+
self._streams[stream_id] = stream
182207
return
183208

184209
if not depends_on:
@@ -206,14 +231,18 @@ def unblock(self, stream_id):
206231
"""
207232
Marks a given stream as unblocked, with more data to send.
208233
"""
234+
# When a stream becomes unblocked,
209235
self._streams[stream_id].active = True
210236

211237
# The iterator protocol
212238
def __iter__(self):
213239
return self
214240

215241
def __next__(self):
216-
return self._root_stream.schedule()
242+
try:
243+
return self._root_stream.schedule()
244+
except queue.Empty:
245+
raise DeadlockError("No unblocked streams to schedule.")
217246

218247
def next(self):
219248
return self.__next__()

test/test_priority.py

Lines changed: 125 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,12 @@
1010
import collections
1111
import itertools
1212

13+
import pytest
14+
1315
from hypothesis import given
14-
from hypothesis.strategies import integers, lists, tuples
16+
from hypothesis.strategies import (
17+
integers, lists, tuples, composite, sampled_from
18+
)
1519

1620
import priority
1721

@@ -23,6 +27,83 @@
2327
unique_by=lambda x: x[0],
2428
)
2529

30+
BLOCKED_AND_ACTIVE = lists(
31+
elements=sampled_from([1, 3, 5, 7, 9, 11]),
32+
unique=True,
33+
).map(
34+
lambda blocked: (blocked, active_readme_streams_from_filter(blocked))
35+
)
36+
37+
UNBLOCKED_AND_ACTIVE = lists(
38+
elements=sampled_from([1, 3, 5, 7, 9, 11]),
39+
unique=True,
40+
).map(
41+
lambda unblocked: (unblocked, active_readme_streams_from_filter(
42+
unblocked, blocked=False
43+
))
44+
)
45+
46+
47+
48+
def readme_tree():
49+
"""
50+
Provide a tree configured as the one in the readme.
51+
"""
52+
p = priority.PriorityTree()
53+
p.insert_stream(stream_id=1)
54+
p.insert_stream(stream_id=3)
55+
p.insert_stream(stream_id=5, depends_on=1)
56+
p.insert_stream(stream_id=7, weight=32)
57+
p.insert_stream(stream_id=9, depends_on=7, weight=8)
58+
p.insert_stream(stream_id=11, depends_on=7, exclusive=True)
59+
return p
60+
61+
62+
def active_readme_streams_from_filter(filtered, blocked=True):
63+
"""
64+
Given a collection of filtered streams, determine which ones are active.
65+
This applies only to the readme tree at this time, though in future it
66+
should be possible to apply this to an arbitrary tree.
67+
68+
If ``blocked`` is ``True``, the filter is a set of blocked streams. If
69+
``False``, it's a collection of unblocked streams.
70+
"""
71+
tree = {
72+
1: {
73+
5: {},
74+
},
75+
3: {},
76+
7: {
77+
11: {
78+
9: {},
79+
},
80+
},
81+
}
82+
filtered = set(filtered)
83+
84+
def get_expected(tree):
85+
expected = []
86+
87+
for stream_id in tree:
88+
if stream_id not in filtered and blocked:
89+
expected.append(stream_id)
90+
elif stream_id in filtered and not blocked:
91+
expected.append(stream_id)
92+
else:
93+
expected.extend(get_expected(tree[stream_id]))
94+
95+
return expected
96+
97+
return get_expected(tree)
98+
99+
100+
def active_streams_from_unblocked(unblocked):
101+
"""
102+
Given a collection of unblocked streams, determine which ones are active.
103+
This applies only to the readme tree at this time, though in future it
104+
should be possible to apply this to an arbitrary tree.
105+
"""
106+
26107

27108
class TestStream(object):
28109
def test_stream_repr(self):
@@ -43,15 +124,55 @@ class TestPriorityTreeManual(object):
43124
Hypothesis-based ones for the same data, but getting Hypothesis to generate
44125
useful data in this case is going to be quite tricky.
45126
"""
46-
def test_priority_tree_initially_outputs_all_stream_ids(self, readme_tree):
127+
@given(BLOCKED_AND_ACTIVE)
128+
def test_priority_tree_initially_outputs_all_stream_ids(self,
129+
blocked_expected):
47130
"""
48131
The first iterations of the priority tree initially output the active
49132
streams, in order of stream ID, regardless of weight.
50133
"""
51-
expected = [1, 3, 7]
52-
result = [next(readme_tree) for _ in range(len(expected))]
134+
tree = readme_tree()
135+
blocked = blocked_expected[0]
136+
expected = blocked_expected[1]
137+
138+
for stream_id in blocked:
139+
tree.block(stream_id)
140+
141+
result = [next(tree) for _ in range(len(expected))]
53142
assert expected == result
54143

144+
@given(UNBLOCKED_AND_ACTIVE)
145+
def test_priority_tree_blocking_is_isomorphic(self,
146+
allowed_expected):
147+
"""
148+
Blocking everything and then unblocking certain ones has the same
149+
effect as blocking specific streams.
150+
"""
151+
tree = readme_tree()
152+
allowed = allowed_expected[0]
153+
expected = allowed_expected[1]
154+
155+
for stream_id in range(1, 12, 2):
156+
tree.block(stream_id)
157+
158+
for stream_id in allowed:
159+
tree.unblock(stream_id)
160+
161+
result = [next(tree) for _ in range(len(expected))]
162+
assert expected == result
163+
164+
def test_priority_tree_raises_deadlock_error_if_all_blocked(self):
165+
"""
166+
Assuming all streams are blocked and none can progress, asking for the
167+
one with the next highest priority fires a DeadlockError.
168+
"""
169+
tree = readme_tree()
170+
for stream_id in range(1, 12, 2):
171+
tree.block(stream_id)
172+
173+
with pytest.raises(priority.DeadlockError):
174+
next(tree)
175+
55176

56177
class TestPriorityTreeOutput(object):
57178
"""

0 commit comments

Comments
 (0)