@@ -22,8 +22,7 @@ def splitted(iterable, pre, offset):
2222 try :
2323 while True :
2424 index += 1
25- with ParallelUtils .iterLock :
26- elem = next (iterable )
25+ elem = next (iterable )
2726 if (index < pre ):
2827 continue
2928 if (index % offset == pre ):
@@ -33,19 +32,20 @@ def splitted(iterable, pre, offset):
3332
3433 @staticmethod
3534 def _iterator (iterable ):
36- while True :
37- try :
35+ try :
36+ while True :
3837 yield next (iterable )
39- except :
40- return
38+ except :
39+ return
4140
4241 @staticmethod
4342 def cloneSplit (iterable , count ):
44- return [ParallelUtils ._iterator (iterable ) for _ in range (count )]
43+ it = ParallelUtils ._iterator (iterable )
44+ return [it for _ in range (count )]
4545
4646 @staticmethod
4747 def split (iterable , count ):
48- return [ParallelUtils .splitted (it , index , count ) for index , it in enumerate (iter ( tee (iterable , count ) ))]
48+ return [ParallelUtils .splitted (it , index , count ) for index , it in enumerate (tee (iterable , count ))]
4949
5050 @staticmethod
5151 def finiteSplit (iterable , count ):
@@ -66,8 +66,8 @@ class ParallelStream(Stream):
6666
6767 def __init__ (self , iterable ):
6868
69- self .__streams = [StreamThread (Stream (iterator ))
70- for iterator in ParallelUtils .cloneSplit (iterable , self .PROCESS )]
69+ self .__streams = [StreamThread (Stream (it ))
70+ for it in ParallelUtils .split (iterable , self .PROCESS )]
7171
7272 for _stream in self .__streams :
7373 _stream .start ()
@@ -234,7 +234,11 @@ def count(self):
234234 results = [_stream .getResult ()
235235 for _stream in self .__streams if _stream .getResult ()]
236236
237- return sum (results )
237+ res = 0
238+ for r in results :
239+ res += r
240+
241+ return res
238242
239243 def toList (self ):
240244
@@ -244,8 +248,8 @@ def toList(self):
244248 for _stream in self .__streams :
245249 _stream .join ()
246250
247- sublists = [_stream .getResult (). get ()
248- for _stream in self .__streams if _stream . getResult (). isPresent () ]
251+ sublists = [_stream .getResult ()
252+ for _stream in self .__streams ]
249253
250254 results = []
251255 for sub in sublists :
@@ -260,8 +264,8 @@ def toSet(self):
260264 for _stream in self .__streams :
261265 _stream .join ()
262266
263- subsets = [_stream .getResult (). get ()
264- for _stream in self .__streams if _stream . getResult (). isPresent () ]
267+ subsets = [_stream .getResult ()
268+ for _stream in self .__streams ]
265269
266270 results = set ()
267271 for sub in subsets :
@@ -272,3 +276,9 @@ def toSet(self):
272276
273277 def get (self ):
274278 return self .__streams
279+
280+ def __eq__ (self , other ):
281+ return self .toSet () == other .toSet ()
282+
283+ def __iter__ (self ):
284+ return iter (self .toList ())
0 commit comments