Skip to content

Commit 1dab109

Browse files
committed
Refactored backend modules to mimic
subprocess & io builtin modules: - ffmpegprocess - io - _utils/threaded_pipe - _utils/queue_pausable - _utils/pipe_nonblock
1 parent ff1c21a commit 1dab109

27 files changed

Lines changed: 3219 additions & 402 deletions

docsrc/requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,4 @@ sphinx-rtd-theme
33
sphinx-autopackagesummary
44
sphinxcontrib-blockdiag
55
matplotlib
6+
PyQt5

requirements.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
-r tests/requirements.txt
22
-r docsrc/requirements.txt
3-
PyQt5
3+
yappi

sandbox/test_ffmpegio.py

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
import time
2+
from ffmpegio import ffmpeg, utils, audio
3+
import subprocess as sp
4+
from ffmpegio import io
5+
6+
# logging.basicConfig(level=logging.DEBUG)
7+
8+
9+
url = "tests/assets/testaudio-1m.mp3"
10+
sample_fmt = "s16"
11+
out_codec, dtype = utils.get_audio_format(sample_fmt)
12+
container = out_codec[4:]
13+
14+
fs, s = audio.read(url, sample_fmt=sample_fmt)
15+
16+
args = {
17+
"inputs": [("-", {"f": "mp3"})],
18+
"outputs": [("-", {"f": container, "c:a": out_codec, "sample_fmt": sample_fmt})],
19+
}
20+
21+
with open(url, "rb") as f:
22+
bytes = f.read() # byte array
23+
24+
25+
feed = io.QueuedWriter()
26+
read = io.QueuedReader()
27+
28+
proc = ffmpeg._run(sp.Popen, args, stdin=feed.fileno(), stdout=read.fileno())
29+
30+
try:
31+
feed.write(bytes)
32+
feed.mark_eof()
33+
34+
# time.sleep(0.1)
35+
# print(f'# of data blocks: {read._pipe.queue.qsize()}')
36+
37+
# time.sleep(0.1)
38+
# print(f'# of data blocks: {read._pipe.queue.qsize()}')
39+
40+
# proc.wait()
41+
# print(f'# of data blocks: {read._pipe.queue.qsize()}')
42+
43+
for i in range(100):
44+
data = read.read(8192, timeout=1)
45+
if data is None:
46+
break
47+
try:
48+
print(f"{i}: {len(data)} bytes")
49+
except io.TimeoutExpired:
50+
print(f"{i}: empty")
51+
except Exception as e:
52+
proc.kill()
53+
print(e)
54+
finally:
55+
feed._pipe.join()
56+
read._pipe.join()
57+
proc.wait()

sandbox/test_ffmpegque.py

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
import time
2+
from ffmpegio import ffmpeg, utils, audio
3+
import subprocess as sp
4+
from ffmpegio._utils import threaded_pipe as tp
5+
6+
# logging.basicConfig(level=logging.DEBUG)
7+
8+
9+
url = "tests/assets/testaudio-1m.mp3"
10+
sample_fmt = "s16"
11+
out_codec, dtype = utils.get_audio_format(sample_fmt)
12+
container = out_codec[4:]
13+
14+
fs, s = audio.read(url, sample_fmt=sample_fmt)
15+
16+
args = {
17+
"inputs": [("-", {"f": "mp3"})],
18+
"outputs": [("-", {"f": container, "c:a": out_codec, "sample_fmt": sample_fmt})],
19+
}
20+
21+
with open(url, "rb") as f:
22+
bytes = f.read() # byte array
23+
24+
25+
feed = tp.ThreadedPipe(True)
26+
read = tp.ThreadedPipe(False)
27+
28+
feed.start()
29+
read.start()
30+
31+
feed.open()
32+
read.open()
33+
34+
proc = ffmpeg._run(sp.Popen, args, stdin=feed.fileno(), stdout=read.fileno())
35+
36+
try:
37+
feed.queue.put(bytes)
38+
39+
time.sleep(0.1)
40+
print(f'# of data blocks: {read.queue.qsize()}')
41+
42+
time.sleep(0.1)
43+
print(f'# of data blocks: {read.queue.qsize()}')
44+
45+
for i in range(100):
46+
data = read.queue.get(True, 1)
47+
try:
48+
print(f"{i}: {len(data)} bytes")
49+
except tp.Empty:
50+
print(f"{i}: empty")
51+
except:
52+
proc.kill()
53+
finally:
54+
feed.join()
55+
read.join()
56+
proc.wait()

sandbox/test_pipeio.py

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
"""Experiment of asynchronous os.pipe()/os.read()/os.write()
2+
3+
3 threads, 2 pipes, 4 i/o block sizes:
4+
5+
1. feeder (tx to processor at rate x)
6+
2. processor (rx from feeder at rate x1 and tx to reader at rate y)
7+
3. reader (rx from processor at rate y1)
8+
9+
only reader rx is non-blocking
10+
11+
"""
12+
13+
from ffmpegio._utils import pipe_nonblock
14+
import os
15+
import threading
16+
import time
17+
import math
18+
19+
20+
def processor(stdin, stdout, nin, nout, nblks):
21+
print("[processor] starting")
22+
nintotal = 0
23+
noutottal = 0
24+
for i in range(nblks):
25+
print(f"[processor n={i}]")
26+
nread = 0
27+
while nread < nin:
28+
indata = os.read(stdin, nin - nread)
29+
nread += len(indata)
30+
if nread != nin:
31+
print(f"[processor] received {nread} expecting {nin}")
32+
time.sleep(1e-3)
33+
34+
os.write(stdout, b"0" * nout)
35+
36+
nintotal += len(indata)
37+
noutottal += nout
38+
print(
39+
f"[processor n={i}] rx {nintotal}/{nin*nblks}, tx {noutottal}/{nout*nblks}"
40+
)
41+
42+
os.close(stdin)
43+
os.close(stdout)
44+
print("[processor] exiting")
45+
46+
47+
def feeder(stdin, min, inblks):
48+
print("[feeder] starting")
49+
ntotal = 0
50+
for i in range(inblks):
51+
print(f"[feeder {i}]")
52+
os.write(stdin, b"0" * min)
53+
ntotal += min
54+
print(f"[feeder {i}] ntotal tx'ed = {ntotal}/{min*inblks}")
55+
os.close(stdin)
56+
print("[feeder] exiting")
57+
58+
59+
def reader(stdout, mout, outblks, timeout):
60+
print("[reader] starting")
61+
ntotal = 0
62+
for o in range(outblks):
63+
print(f"[reader {o}]")
64+
65+
nread = 0
66+
while nread < mout:
67+
try:
68+
data = pipe_nonblock.read(stdout, mout - nread)
69+
nread += len(data)
70+
# print(f"[reader {o}] rx'ed {nread}/{mout} bytes")
71+
except pipe_nonblock.NoData:
72+
# print(f"[reader {o}] no data")
73+
time.sleep(timeout)
74+
if nread > mout:
75+
raise RuntimeError(f"rx'ed too many: {nread}/{mout}")
76+
ntotal += nread
77+
print(f"[reader {o}] ntotal = {ntotal}/{mout*outblks}")
78+
79+
os.close(stdout)
80+
print("[reader] exiting")
81+
82+
83+
txout, txin = os.pipe()
84+
rxout, rxin = pipe_nonblock.pipe()
85+
86+
nblks = 3
87+
inblks = 5
88+
outblks = 7
89+
g = 8
90+
91+
blk = g * nblks * inblks * outblks // math.gcd(nblks, inblks, outblks, g)
92+
93+
nin = nblks * blk
94+
nout = g * nblks * blk
95+
timeout = 10e-3
96+
97+
98+
proc = threading.Thread(
99+
target=processor, args=(txout, rxin, nin // nblks, nout // nblks, nblks)
100+
)
101+
proc.start()
102+
103+
feed = threading.Thread(target=feeder, args=(txin, nin // inblks, inblks))
104+
feed.start()
105+
106+
read = threading.Thread(target=reader, args=(rxout, nout // outblks, outblks, timeout))
107+
read.start()
108+
109+
110+
proc.join()
111+
feed.join()
112+
read.join()

sandbox/test_queueio.py

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
"""Experiment of asynchronous os.pipe()/os.read()/os.write()
2+
3+
3 threads, 2 pipes, 4 i/o block sizes:
4+
5+
1. feeder (tx to processor at rate x)
6+
2. processor (rx from feeder at rate x1 and tx to reader at rate y)
7+
3. reader (rx from processor at rate y1)
8+
9+
only reader rx is non-blocking
10+
11+
"""
12+
13+
from ffmpegio._utils import threaded_pipe as tp
14+
import os
15+
import threading
16+
import time
17+
import math
18+
19+
20+
def processor(stdin, stdout, nin, nout, nblks):
21+
print("[processor] starting")
22+
nintotal = 0
23+
noutottal = 0
24+
for i in range(nblks):
25+
print(f"[processor n={i}]")
26+
nread = 0
27+
while nread < nin:
28+
indata = os.read(stdin, nin - nread)
29+
nread += len(indata)
30+
if nread != nin:
31+
print(f"[processor] received {nread} expecting {nin}")
32+
time.sleep(100e-3)
33+
34+
os.write(stdout, b"0" * nout)
35+
36+
nintotal += len(indata)
37+
noutottal += nout
38+
print(
39+
f"[processor n={i}] rx {nintotal}/{nin*nblks}, tx {noutottal}/{nout*nblks}"
40+
)
41+
42+
os.close(stdin)
43+
os.close(stdout)
44+
print("[processor] exiting")
45+
46+
47+
nblks = 3
48+
inblks = 1
49+
outblks = 7
50+
g = 4
51+
52+
blk = 32*g * nblks * inblks * outblks // math.gcd(nblks, inblks, outblks, g)
53+
54+
nin = nblks * blk
55+
nout = g * nblks * blk
56+
timeout = 10e-3
57+
58+
feed = tp.ThreadedPipe(True)
59+
read = tp.ThreadedPipe(False)
60+
61+
print(nin,nout)
62+
63+
feed.start()
64+
read.start()
65+
66+
feed.open()
67+
read.open()
68+
69+
proc = threading.Thread(
70+
target=processor,
71+
args=(feed.fileno(), read.fileno(), nin // nblks, nout // nblks, nblks),
72+
)
73+
74+
proc.start()
75+
76+
try:
77+
feed.queue.put(b"0" * nin)
78+
79+
for i in range(outblks):
80+
read.queue.get(True, 1)
81+
finally:
82+
proc.join()
83+
feed.join()
84+
read.join()

src/ffmpegio/_utils/__init__.py

Whitespace-only changes.

0 commit comments

Comments
 (0)