-
-
Notifications
You must be signed in to change notification settings - Fork 103
Expand file tree
/
Copy pathtest_multibar.py
More file actions
391 lines (311 loc) · 11.6 KB
/
Copy pathtest_multibar.py
File metadata and controls
391 lines (311 loc) · 11.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
import contextlib
import io
import random
import threading
import time
import pytest
import progressbar
N = 10
BARS = 3
SLEEP = 0.002
def test_multi_progress_bar_out_of_range() -> None:
widgets = [
progressbar.MultiProgressBar('multivalues'),
]
bar = progressbar.ProgressBar(widgets=widgets, max_value=10)
with pytest.raises(ValueError):
bar.update(multivalues=[123])
with pytest.raises(ValueError):
bar.update(multivalues=[-1])
def test_multibar() -> None:
multibar = progressbar.MultiBar(
sort_keyfunc=lambda bar: bar.label,
remove_finished=0.005,
)
multibar.show_initial = False
multibar.render(force=True)
multibar.show_initial = True
multibar.render(force=True)
multibar.start()
multibar.append_label = False
multibar.prepend_label = True
# Test handling of progressbars that don't call the super constructors
bar = progressbar.ProgressBar(max_value=N)
bar.index = -1
multibar['x'] = bar
bar.start()
# Test twice for other code paths
multibar['x'] = bar
multibar._label_bar(bar)
multibar._label_bar(bar)
bar.finish()
del multibar['x']
multibar.prepend_label = False
multibar.append_label = True
append_bar = progressbar.ProgressBar(max_value=N)
append_bar.start()
multibar._label_bar(append_bar)
multibar['append'] = append_bar
multibar.render(force=True)
def do_something(bar):
for j in bar(range(N)):
time.sleep(0.01)
bar.update(j)
for i in range(BARS):
thread = threading.Thread(
target=do_something,
args=(multibar[f'bar {i}'],),
)
thread.start()
for bar in list(multibar.values()):
for j in range(N):
bar.update(j)
time.sleep(SLEEP)
multibar.render(force=True)
multibar.remove_finished = False
multibar.show_finished = False
append_bar.finish()
multibar.render(force=True)
multibar.join(0.1)
multibar.stop(0.1)
@pytest.mark.parametrize(
'sort_key',
[
None,
'index',
'label',
'value',
'percentage',
progressbar.SortKey.CREATED,
progressbar.SortKey.LABEL,
progressbar.SortKey.VALUE,
progressbar.SortKey.PERCENTAGE,
],
)
def test_multibar_sorting(sort_key) -> None:
with progressbar.MultiBar() as multibar:
for i in range(BARS):
label = f'bar {i}'
multibar[label] = progressbar.ProgressBar(max_value=N)
for bar in multibar.values():
for _j in bar(range(N)):
assert bar.started()
time.sleep(SLEEP)
for bar in multibar.values():
assert bar.finished()
def test_offset_bar() -> None:
with progressbar.ProgressBar(line_offset=2) as bar:
for i in range(N):
bar.update(i)
def test_multibar_show_finished() -> None:
multibar = progressbar.MultiBar(show_finished=True)
multibar['bar'] = progressbar.ProgressBar(max_value=N)
multibar.render(force=True)
with progressbar.MultiBar(show_finished=False) as multibar:
multibar.finished_format = 'finished: {label}'
for i in range(3):
multibar[f'bar {i}'] = progressbar.ProgressBar(max_value=N)
for bar in multibar.values():
for i in range(N):
bar.update(i)
time.sleep(SLEEP)
# The context manager waits for all bars to finish
bar.finish()
multibar.render(force=True)
def test_multibar_show_initial() -> None:
multibar = progressbar.MultiBar(show_initial=False)
multibar['bar'] = progressbar.ProgressBar(max_value=N)
multibar.render(force=True)
def test_multibar_empty_key() -> None:
multibar = progressbar.MultiBar()
multibar[''] = progressbar.ProgressBar(max_value=N)
for name in multibar:
assert name == ''
bar = multibar[name]
bar.update(1)
multibar.render(force=True)
def test_multibar_print() -> None:
bars = 5
n = 10
def print_sometimes(bar, probability):
for i in bar(range(n)):
# Sleep up to 0.1 seconds
time.sleep(random.random() * 0.1)
# print messages at random intervals to show how extra output works
if random.random() < probability:
bar.print('random message for bar', bar, i)
with progressbar.MultiBar() as multibar:
for i in range(bars):
# Get a progressbar
bar = multibar[f'Thread label here {i}']
bar.max_error = False
# Create a thread and pass the progressbar
# Print never, sometimes and always
threading.Thread(target=print_sometimes, args=(bar, 0)).start()
threading.Thread(target=print_sometimes, args=(bar, 0.5)).start()
threading.Thread(target=print_sometimes, args=(bar, 1)).start()
for i in range(5):
multibar.print(f'{i}', flush=False)
# Note: MultiBar inherits from dict, so update() would be
# dict.update and insert bogus entries; render() is intended here
multibar.render(force=True, flush=False)
multibar.render(force=True, flush=True)
def test_multibar_no_format() -> None:
with progressbar.MultiBar(
initial_format=None, finished_format=None
) as multibar:
bar = multibar['a']
for i in bar(range(5)):
bar.print(i)
def test_multibar_finished() -> None:
multibar = progressbar.MultiBar(initial_format=None, finished_format=None)
bar = multibar['bar'] = progressbar.ProgressBar(max_value=5)
bar2 = multibar['bar2']
multibar.render(force=True)
multibar.print('Hi')
multibar.render(force=True, flush=False)
for i in range(6):
bar.update(i)
bar2.update(i)
multibar.render(force=True)
def test_multibar_finished_format() -> None:
multibar = progressbar.MultiBar(
finished_format='Finished {label}', show_finished=True
)
bar = multibar['bar'] = progressbar.ProgressBar(max_value=5)
bar2 = multibar['bar2']
multibar.render(force=True)
multibar.print('Hi')
multibar.render(force=True, flush=False)
bar.start()
bar2.start()
multibar.render(force=True)
multibar.print('Hi')
multibar.render(force=True, flush=False)
for i in range(6):
bar.update(i)
bar2.update(i)
multibar.render(force=True)
def test_multibar_threads() -> None:
multibar = progressbar.MultiBar(finished_format=None, show_finished=True)
bar = multibar['bar'] = progressbar.ProgressBar(max_value=5)
multibar.start()
time.sleep(0.1)
bar.update(3)
time.sleep(0.1)
# join() waits until all bars have finished, so finish first
bar.finish()
multibar.join()
multibar.join()
multibar.render(force=True)
def test_multibar_instances_do_not_share_thread_state() -> None:
# Regression: D1 - thread primitives were class attributes shared
# between all MultiBar instances.
multibar_a = progressbar.MultiBar(fd=io.StringIO())
multibar_b = progressbar.MultiBar(fd=io.StringIO())
assert multibar_a._thread_finished is not multibar_b._thread_finished
assert multibar_a._thread_closed is not multibar_b._thread_closed
assert multibar_a._print_lock is not multibar_b._print_lock
def test_multibar_stop_does_not_poison_new_instances() -> None:
# Regression: D1 - stop() set a class-level Event, killing the render
# loop of every MultiBar created afterwards.
multibar = progressbar.MultiBar(fd=io.StringIO())
multibar.start()
multibar.stop(timeout=5)
fresh = progressbar.MultiBar(fd=io.StringIO())
assert not fresh._thread_finished.is_set()
def test_multibar_start_keeps_render_thread_alive() -> None:
# Regression: D6 - start() called _thread_closed.set() instead of
# clearing it, so an empty multibar's render thread exited before
# any bars could be added.
multibar = progressbar.MultiBar(fd=io.StringIO())
multibar.start()
try:
assert not multibar._thread_closed.is_set()
assert multibar._thread is not None
multibar._thread.join(timeout=0.5)
assert multibar._thread.is_alive()
finally:
multibar.stop(timeout=5)
def test_multibar_flush_does_not_emit_nul_bytes() -> None:
# Regression: D3 - flush() truncated the buffer without seeking back,
# so later writes padded the gap with NUL characters.
fd = io.StringIO()
multibar = progressbar.MultiBar(fd=fd)
multibar.print('hello')
multibar.print('world')
assert '\x00' not in fd.getvalue()
def test_multibar_prepend_and_append_label() -> None:
# Regression: D7 - the append_label branch was unreachable when
# prepend_label was enabled as well.
multibar = progressbar.MultiBar(
prepend_label=True,
append_label=True,
fd=io.StringIO(),
)
bar = progressbar.ProgressBar(
max_value=N,
widgets=['x'],
fd=io.StringIO(),
)
multibar['job'] = bar
multibar._label_bar(bar)
assert str(bar.widgets[0]).startswith('job')
assert str(bar.widgets[-1]).startswith('job')
def test_multibar_join_timeout_keeps_thread_reference() -> None:
# Regression: D8 - join(timeout) dropped the thread reference even
# when the thread was still running.
multibar = progressbar.MultiBar(fd=io.StringIO())
assert multibar['unfinished'] is not None # creates an unfinished bar
multibar.start()
try:
multibar.join(timeout=0.01)
assert multibar._thread is not None
assert multibar._thread.is_alive()
finally:
multibar.stop(timeout=5)
def test_multibar_exception_in_context_exits_promptly() -> None:
# Regression: D4 - an exception inside `with MultiBar()` hung forever
# in __exit__ because join() waited for bars that never finish.
holder: dict[str, progressbar.MultiBar] = {}
def scenario() -> None:
multibar = holder['multibar'] = progressbar.MultiBar(
fd=io.StringIO(),
)
# Pre-fix the event is shared class state which other tests may
# have set; post-fix this only touches this instance.
multibar._thread_finished.clear()
# The bar must exist before the render thread starts so the
# thread observes an unfinished bar.
multibar['a'].update(0)
with contextlib.suppress(RuntimeError), multibar:
raise RuntimeError('boom')
worker = threading.Thread(target=scenario, daemon=True)
worker.start()
worker.join(timeout=5)
try:
assert not worker.is_alive(), '__exit__ hung on unfinished bars'
finally:
# Unstick the render thread regardless of the outcome
holder['multibar']._thread_finished.set()
def test_multibar_concurrent_mutation() -> None:
# Regression: D2 - the render thread iterated self.values() without a
# snapshot while other threads add/remove bars.
errors: list[threading.ExceptHookArgs] = []
original_excepthook = threading.excepthook
threading.excepthook = errors.append
multibar = progressbar.MultiBar(fd=io.StringIO())
# Pre-fix the event is shared class state which other tests may have
# set; post-fix this only touches this instance.
multibar._thread_finished.clear()
assert multibar['keep'] is not None # creates an unfinished bar
multibar.start()
try:
for i in range(300):
assert multibar[f'bar {i}'] is not None
del multibar[f'bar {i}']
finally:
multibar.stop(timeout=5)
threading.excepthook = original_excepthook
assert not errors
assert not multibar._thread or not multibar._thread.is_alive()