Skip to content

Commit 069b2f1

Browse files
committed
Partial work.
1 parent 35fc88d commit 069b2f1

File tree

4 files changed

+155
-21
lines changed

4 files changed

+155
-21
lines changed

README.rst

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -88,10 +88,14 @@ Querying The Tree: Gate
8888
~~~~~~~~~~~~~~~~~~~~~~~
8989

9090
The Priority 'gate' is an alternative approach of accessing HTTP/2 priorities
91-
based on `this presentation`_. It works by providing an iterator that acts as a
92-
'gate'. Each value popped off the iterator is the next stream that should be
93-
acted on. The PriorityTree can be mutated on the fly to add, remove, block, and
94-
unblock streams, which affects what the next value from the iterator will be.
91+
based on a modified version of the `approach used by nghttp2`_, which itself is
92+
based on the approach used in H2O. Our implementation more closely ties to the
93+
version used by H2O.
94+
95+
It works by providing an iterator that acts as a 'gate'. Each value popped off
96+
the iterator is the next stream that should be acted on. The underlying
97+
``PriorityTree`` can be mutated on the fly to add, remove, block, and unblock
98+
streams, which affects what the next value from the iterator will be.
9599

96100
This works well when combined with some kind of signaling mechanism to block
97101
and unblock threads of execution. This would allow you to do something like
@@ -115,7 +119,7 @@ For example:
115119
>>> for stream_id in p.gate()
116120
... now_blocked = unblock(stream_id)
117121
... if now_blocked:
118-
... p.blocked(stream_id)
122+
... p.block(stream_id)
119123
... unblocked = all_unblocked_streams()
120124
... for unblocked_stream_id in unblocked:
121125
... p.unblock(unblocked_stream_id)
@@ -136,4 +140,4 @@ repository.
136140

137141

138142
.. _RFC 7540 Section 5.3 (Stream Priority): https://tools.ietf.org/html/rfc7540#section-5.3
139-
.. _this presentation: http://example.com/
143+
.. _approach used by nghttp2: https://nghttp2.org/blog/2015/11/11/stream-scheduling-utilizing-http2-priority/

src/priority/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,4 @@
22
"""
33
priority: HTTP/2 priority implementation for Python
44
"""
5-
from .priority import Stream, Priorities, PriorityTree # noqa
5+
from .priority import Stream, PriorityTree # noqa

src/priority/priority.py

Lines changed: 93 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ def __init__(self, stream_id, weight=16):
2222
self.stream_id = stream_id
2323
self.weight = weight
2424
self.children = []
25+
self.parent = None
2526
self.child_queue = queue.PriorityQueue()
2627
self.active = True
2728
self.last_weight = 0
@@ -30,22 +31,50 @@ def add_child(self, child):
3031
"""
3132
Add a stream that depends on this one.
3233
"""
34+
child.parent = self
3335
self.children.append(child)
34-
self.child_queue.put((self.current_val, child))
36+
self.child_queue.put((self.last_weight, child))
3537

3638
def add_child_exclusive(self, child):
3739
"""
3840
Add a stream that exclusively depends on this one.
3941
"""
4042
old_children = self.children
41-
self.children = [child]
43+
self.children = []
44+
self.child_queue = queue.PriorityQueue()
45+
self.last_weight = 0
46+
self.add_child(child)
4247

4348
for old_child in old_children:
4449
child.add_child(old_child)
4550

51+
def remove_child(self, child):
52+
"""
53+
Removes a child stream from this stream. This is a potentially somewhat
54+
expensive operation.
55+
"""
56+
# To do this we do the following:
57+
#
58+
# - remove the child stream from the list of children
59+
# - build a new priority queue, filtering out the child when we find
60+
# it in the old one
61+
self.children.remove(child)
62+
63+
new_queue = queue.PriorityQueue()
64+
65+
while not self.child_queue.empty():
66+
level, stream = self.child_queue.pop()
67+
if stream == child:
68+
continue
69+
70+
new_queue.put((level, stream))
71+
72+
self.child_queue = new_queue
73+
4674
def schedule(self):
4775
"""
48-
Returns the stream ID of the next child to schedule.
76+
Returns the stream ID of the next child to schedule. Potentially
77+
recurses down the tree of priorities.
4978
"""
5079
# Cannot be called on active streams.
5180
assert not self.active
@@ -57,30 +86,50 @@ def schedule(self):
5786
else:
5887
next_stream = child.schedule()
5988

60-
new_level = level + child.weight
61-
self.child_queue.put((new_level, child))
62-
self.last_weight = new_level
89+
self.last_weight = level
90+
91+
level += (256 // child.weight)
92+
self.child_queue.put((level, child))
6393

6494
return next_stream
6595

6696
# Custom repr
6797
def __repr__(self):
6898
return "Stream<id=%d, weight=%d>" % (self.stream_id, self.weight)
6999

70-
# Custom equality
100+
# Custom comparison
71101
def __eq__(self, other):
72-
if not isinstance(other, self.__class__):
73-
return False
102+
if not isinstance(other, Stream):
103+
raise TypeError("Streams can only be equal to other streams")
74104

75-
return (
76-
self.stream_id == other.stream_id and
77-
self.weight == other.weight and
78-
self.children == other.children
79-
)
105+
return self.stream_id == other.stream_id
80106

81107
def __ne__(self, other):
82108
return not self.__eq__(other)
83109

110+
def __lt__(self, other):
111+
if not isinstance(other, Stream):
112+
return NotImplemented
113+
114+
return self.stream_id < other.stream_id
115+
116+
def __le__(self, other):
117+
if not isinstance(other, Stream):
118+
return NotImplemented
119+
120+
return self.stream_id <= other.stream_id
121+
122+
def __gt__(self, other):
123+
if not isinstance(other, Stream):
124+
return NotImplemented
125+
126+
return self.stream_id > other.stream_id
127+
128+
def __ge__(self, other):
129+
if not isinstance(other, Stream):
130+
return NotImplemented
131+
132+
return self.stream_id >= other.stream_id
84133

85134

86135
class PriorityTree(object):
@@ -133,3 +182,33 @@ def insert_stream(self,
133182
parent = self._streams[depends_on]
134183
parent.add_child(stream)
135184
self._streams[stream_id] = stream
185+
186+
def remove_stream(self, stream_id):
187+
"""
188+
Removes a stream from the priority tree.
189+
"""
190+
# TODO: At some point we should actually prune streams we no longer
191+
# need. For now, just mark it permanently blocked.
192+
self._streams[stream_id].active = False
193+
194+
def block(self, stream_id):
195+
"""
196+
Marks a given stream as blocked, with no data to send.
197+
"""
198+
self._streams[stream_id].active = False
199+
200+
def unblock(self, stream_id):
201+
"""
202+
Marks a given stream as unblocked, with more data to send.
203+
"""
204+
self._streams[stream_id].active = True
205+
206+
# The iterator protocol
207+
def __iter__(self):
208+
return self
209+
210+
def __next__(self):
211+
return self._root_stream.schedule()
212+
213+
def next(self):
214+
return self.__next__()

test/test_priority.py

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,27 @@
55
66
Tests for the Priority trees
77
"""
8+
from __future__ import division
9+
810
import itertools
911

12+
import random
13+
1014
from hypothesis import given
1115
from hypothesis.strategies import integers, lists, tuples
1216

1317
import priority
1418

19+
try:
20+
from functools import reduce
21+
except ImportError:
22+
pass
23+
24+
try:
25+
from math import gcd
26+
except ImportError:
27+
from fractions import gcd
28+
1529

1630
STREAMS_AND_WEIGHTS = lists(
1731
elements=tuples(
@@ -28,3 +42,40 @@ def test_stream_repr(self):
2842
"""
2943
s = priority.Stream(stream_id=80, weight=16)
3044
assert repr(s) == "Stream<id=80, weight=16>"
45+
46+
47+
class TestPriorityTree(object):
48+
@given(STREAMS_AND_WEIGHTS)
49+
def test_period_of_repetition(self, streams_and_weights):
50+
"""
51+
The period of repetition of a priority sequence is given by the
52+
formula: sum(weights) / gcd(weights). Once that many values have been
53+
pulled out, the sequence should repeat identically.
54+
"""
55+
initial = random.getstate()
56+
57+
p = priority.PriorityTree()
58+
weights = []
59+
60+
for stream, weight in streams_and_weights:
61+
p.insert_stream(stream_id=stream, weight=weight)
62+
weights.append(weight)
63+
64+
if weights:
65+
period = sum(weights) // reduce(gcd, weights)
66+
else:
67+
period = 0
68+
69+
# Pop off the first n elements, which will always be evenly
70+
# distributed.
71+
print(streams_and_weights)
72+
for _ in weights:
73+
print(" " + str(next(p)))
74+
75+
pattern = [next(p) for _ in range(period)]
76+
print(" " + str(pattern))
77+
pattern = itertools.cycle(pattern)
78+
79+
for i in range(period * 20):
80+
assert random.getstate() == initial
81+
assert next(p) == next(pattern), i

0 commit comments

Comments
 (0)