-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathtest_read.py
More file actions
45 lines (38 loc) · 932 Bytes
/
test_read.py
File metadata and controls
45 lines (38 loc) · 932 Bytes
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import subprocess as sp
from namedpipe import NPopen
def run_ffmpeg(pipe):
sz = [320, 240]
return (
sp.Popen(
# fmt:off
[
"ffmpeg",
"-y",
"-f", "lavfi",
"-i", f"testsrc=s={sz[0]}x{sz[1]}:d=5",
"-f", "rawvideo",
"-pix_fmt", "rgb24",
f'{pipe}',
]
# fmt:on
),
sz[0] * sz[1] * 3,
)
def test_read_all():
with NPopen("r") as pipe:
assert pipe.readable()
assert not pipe.writable()
proc, nbytes = run_ffmpeg(pipe)
f = pipe.wait()
while f.read(nbytes):
pass
proc.wait()
def test_read_some():
with NPopen("r") as pipe:
proc, nbytes = run_ffmpeg(pipe)
f = pipe.wait()
for i in range(30):
f.read(nbytes)
proc.kill()
if __name__=='__main__':
test_read_all()