-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathtest_write.py
More file actions
48 lines (39 loc) · 1.01 KB
/
test_write.py
File metadata and controls
48 lines (39 loc) · 1.01 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import numpy as np
import subprocess as sp
from namedpipe import NPopen
from tempfile import TemporaryDirectory
from os import path
def run_ffmpeg(pipe, dstdir):
outpath = path.join(dstdir, "output.mp4")
sz = [320, 240, 3]
return (
sp.Popen(
# fmt:off
[
"ffmpeg",
"-y",
"-f", "rawvideo",
"-pix_fmt", "rgb24",
"-s", f"{sz[0]}x{sz[1]}",
"-r", "30",
"-i", str(pipe),
outpath,
]
# fmt:on
),
sz,
outpath,
)
def test_write():
print("pipe server")
with TemporaryDirectory() as dstdir:
with NPopen("w") as pipe:
proc, shape, outpath = run_ffmpeg(pipe, dstdir)
f = pipe.wait()
for i in range(30):
F = np.random.randint(0, 255, shape, np.uint8)
f.write(F)
proc.wait()
sp.run(["ffprobe", outpath])
if __name__ == "__main__":
test_write()