@@ -22,6 +22,7 @@ def __init__(self, stream_id, weight=16):
2222 self .stream_id = stream_id
2323 self .weight = weight
2424 self .children = []
25+ self .parent = None
2526 self .child_queue = queue .PriorityQueue ()
2627 self .active = True
2728 self .last_weight = 0
@@ -30,22 +31,50 @@ def add_child(self, child):
3031 """
3132 Add a stream that depends on this one.
3233 """
34+ child .parent = self
3335 self .children .append (child )
34- self .child_queue .put ((self .current_val , child ))
36+ self .child_queue .put ((self .last_weight , child ))
3537
3638 def add_child_exclusive (self , child ):
3739 """
3840 Add a stream that exclusively depends on this one.
3941 """
4042 old_children = self .children
41- self .children = [child ]
43+ self .children = []
44+ self .child_queue = queue .PriorityQueue ()
45+ self .last_weight = 0
46+ self .add_child (child )
4247
4348 for old_child in old_children :
4449 child .add_child (old_child )
4550
51+ def remove_child (self , child ):
52+ """
53+ Removes a child stream from this stream. This is a potentially somewhat
54+ expensive operation.
55+ """
56+ # To do this we do the following:
57+ #
58+ # - remove the child stream from the list of children
59+ # - build a new priority queue, filtering out the child when we find
60+ # it in the old one
61+ self .children .remove (child )
62+
63+ new_queue = queue .PriorityQueue ()
64+
65+ while not self .child_queue .empty ():
66+ level , stream = self .child_queue .pop ()
67+ if stream == child :
68+ continue
69+
70+ new_queue .put ((level , stream ))
71+
72+ self .child_queue = new_queue
73+
4674 def schedule (self ):
4775 """
48- Returns the stream ID of the next child to schedule.
76+ Returns the stream ID of the next child to schedule. Potentially
77+ recurses down the tree of priorities.
4978 """
5079 # Cannot be called on active streams.
5180 assert not self .active
@@ -57,30 +86,50 @@ def schedule(self):
5786 else :
5887 next_stream = child .schedule ()
5988
60- new_level = level + child .weight
61- self .child_queue .put ((new_level , child ))
62- self .last_weight = new_level
89+ self .last_weight = level
90+
91+ level += (256 // child .weight )
92+ self .child_queue .put ((level , child ))
6393
6494 return next_stream
6595
6696 # Custom repr
6797 def __repr__ (self ):
6898 return "Stream<id=%d, weight=%d>" % (self .stream_id , self .weight )
6999
70- # Custom equality
100+ # Custom comparison
71101 def __eq__ (self , other ):
72- if not isinstance (other , self . __class__ ):
73- return False
102+ if not isinstance (other , Stream ):
103+ raise TypeError ( "Streams can only be equal to other streams" )
74104
75- return (
76- self .stream_id == other .stream_id and
77- self .weight == other .weight and
78- self .children == other .children
79- )
105+ return self .stream_id == other .stream_id
80106
81107 def __ne__ (self , other ):
82108 return not self .__eq__ (other )
83109
110+ def __lt__ (self , other ):
111+ if not isinstance (other , Stream ):
112+ return NotImplemented
113+
114+ return self .stream_id < other .stream_id
115+
116+ def __le__ (self , other ):
117+ if not isinstance (other , Stream ):
118+ return NotImplemented
119+
120+ return self .stream_id <= other .stream_id
121+
122+ def __gt__ (self , other ):
123+ if not isinstance (other , Stream ):
124+ return NotImplemented
125+
126+ return self .stream_id > other .stream_id
127+
128+ def __ge__ (self , other ):
129+ if not isinstance (other , Stream ):
130+ return NotImplemented
131+
132+ return self .stream_id >= other .stream_id
84133
85134
86135class PriorityTree (object ):
@@ -133,3 +182,33 @@ def insert_stream(self,
133182 parent = self ._streams [depends_on ]
134183 parent .add_child (stream )
135184 self ._streams [stream_id ] = stream
185+
186+ def remove_stream (self , stream_id ):
187+ """
188+ Removes a stream from the priority tree.
189+ """
190+ # TODO: At some point we should actually prune streams we no longer
191+ # need. For now, just mark it permanently blocked.
192+ self ._streams [stream_id ].active = False
193+
194+ def block (self , stream_id ):
195+ """
196+ Marks a given stream as blocked, with no data to send.
197+ """
198+ self ._streams [stream_id ].active = False
199+
200+ def unblock (self , stream_id ):
201+ """
202+ Marks a given stream as unblocked, with more data to send.
203+ """
204+ self ._streams [stream_id ].active = True
205+
206+ # The iterator protocol
207+ def __iter__ (self ):
208+ return self
209+
210+ def __next__ (self ):
211+ return self ._root_stream .schedule ()
212+
213+ def next (self ):
214+ return self .__next__ ()
0 commit comments