Skip to content

Commit 933820d

Browse files
committed
infinite parallel stream doesn't work
1 parent b084258 commit 933820d

File tree

1 file changed

+25
-15
lines changed

1 file changed

+25
-15
lines changed

stream/parallelstream.py

Lines changed: 25 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,7 @@ def splitted(iterable, pre, offset):
2222
try:
2323
while True:
2424
index += 1
25-
with ParallelUtils.iterLock:
26-
elem = next(iterable)
25+
elem = next(iterable)
2726
if(index < pre):
2827
continue
2928
if(index % offset == pre):
@@ -33,19 +32,20 @@ def splitted(iterable, pre, offset):
3332

3433
@staticmethod
3534
def _iterator(iterable):
36-
while True:
37-
try:
35+
try:
36+
while True:
3837
yield next(iterable)
39-
except:
40-
return
38+
except:
39+
return
4140

4241
@staticmethod
4342
def cloneSplit(iterable, count):
44-
return [ParallelUtils._iterator(iterable) for _ in range(count)]
43+
it = ParallelUtils._iterator(iterable)
44+
return [it for _ in range(count)]
4545

4646
@staticmethod
4747
def split(iterable, count):
48-
return [ParallelUtils.splitted(it, index, count) for index, it in enumerate(iter(tee(iterable, count)))]
48+
return [ParallelUtils.splitted(it, index, count) for index, it in enumerate(tee(iterable, count))]
4949

5050
@staticmethod
5151
def finiteSplit(iterable, count):
@@ -66,8 +66,8 @@ class ParallelStream(Stream):
6666

6767
def __init__(self, iterable):
6868

69-
self.__streams = [StreamThread(Stream(iterator))
70-
for iterator in ParallelUtils.cloneSplit(iterable, self.PROCESS)]
69+
self.__streams = [StreamThread(Stream(it))
70+
for it in ParallelUtils.split(iterable, self.PROCESS)]
7171

7272
for _stream in self.__streams:
7373
_stream.start()
@@ -234,7 +234,11 @@ def count(self):
234234
results = [_stream.getResult()
235235
for _stream in self.__streams if _stream.getResult()]
236236

237-
return sum(results)
237+
res = 0
238+
for r in results:
239+
res += r
240+
241+
return res
238242

239243
def toList(self):
240244

@@ -244,8 +248,8 @@ def toList(self):
244248
for _stream in self.__streams:
245249
_stream.join()
246250

247-
sublists = [_stream.getResult().get()
248-
for _stream in self.__streams if _stream.getResult().isPresent()]
251+
sublists = [_stream.getResult()
252+
for _stream in self.__streams]
249253

250254
results = []
251255
for sub in sublists:
@@ -260,8 +264,8 @@ def toSet(self):
260264
for _stream in self.__streams:
261265
_stream.join()
262266

263-
subsets = [_stream.getResult().get()
264-
for _stream in self.__streams if _stream.getResult().isPresent()]
267+
subsets = [_stream.getResult()
268+
for _stream in self.__streams]
265269

266270
results = set()
267271
for sub in subsets:
@@ -272,3 +276,9 @@ def toSet(self):
272276

273277
def get(self):
274278
return self.__streams
279+
280+
def __eq__(self, other):
281+
return self.toSet() == other.toSet()
282+
283+
def __iter__(self):
284+
return iter(self.toList())

0 commit comments

Comments
 (0)