-
Notifications
You must be signed in to change notification settings - Fork 9
Expand file tree
/
Copy pathparser.py
More file actions
234 lines (198 loc) · 7.31 KB
/
parser.py
File metadata and controls
234 lines (198 loc) · 7.31 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
import re, os, shlex
from collections import abc
from ..filtergraph import Graph, Chain, Filter
from .. import devices
__all__ = ["parse", "compose", "FLAG"]
FLAG = None
# till v3.7 is dropped
form_shell_cmd = (
shlex.join
if hasattr(shlex, "join")
else lambda args: " ".join(shlex.quote(arg) for arg in args)
)
def parse_options(args):
"""parse command-line option arguments
:param args: argument string or sequence of arguments
:type args: str or seq of str
:return: parsed options. Flag options get None as their values.
:rtype: dict
"""
if isinstance(args, str):
args = shlex.split(args)
res = {}
n = len(args)
i = 0
while i < n:
key = args[i][1:]
i += 1
if i < n and args[i][0] != "-":
if key not in res:
res[key] = args[i]
elif isinstance(res[key], str):
res[key] = [res[key], args[i]]
else:
res[key].append(args[i])
i += 1
else:
res[key] = FLAG
return res
def parse(cmdline):
"""parse ffmpeg command line arguments
:param cmdline: full or partial ffmpeg command line string
:type cmdline: str or seq(str)
:return: ffmpegio FFmpeg argument dict
:rtype: dict
"""
from ..caps import options
if isinstance(cmdline, str):
# remove multi-line command
cmdline = re.sub(r"\\\n", " ", cmdline)
# split the command line into its options
args = shlex.split(cmdline)
else: # list of strs
args = cmdline
# exclude 'ffmpeg' command if present
if re.search(r'(?:^|[/\\])?ffmpeg(?:.exe)?"?$', args[0], re.IGNORECASE):
args = args[1:]
# extract global options
all_gopts = options("global")
all_lopts = options("per-file")
is_gopt = [len(s) and s[0] == "-" and s[1:] in all_gopts for s in args]
is_lopt = [not tf for tf in is_gopt]
gopts = {}
for i, s in enumerate(args):
if not is_gopt[i]:
continue
k = s[1:]
if k in ("h", "?", "help", "-help"): # special case take all args thereafter
gopts[k] = " ".join(args[i + 1 :])
elif all_gopts[k] is FLAG:
gopts[k] = FLAG
else:
gopts[k] = args[i + 1]
is_lopt[i + 1] = False
args = [s for s, tf in zip(args, is_lopt) if tf]
# identify -i options
ipos = [i + 2 for i, v in enumerate(args) if v == "-i"]
inputs = [
(args[i1 - 1], parse_options(args[i0 : i1 - 2]))
for i0, i1 in zip((0, *ipos[:-1]), ipos)
]
if len(inputs):
args = args[ipos[-1] :] # drop all input arguments
# identify output_urls
opos = [
i + 1
for i, v in enumerate(args)
if v[0] != "-" # must not be an option name/flag
and (
i == 0 # no output options
or args[i - 1][0] != "-" # prev arg specifies an output option value
or all_lopts.get(args[i - 1][1:].split(":", 1)[0], False)
is FLAG # prev arg is a flag
)
]
outputs = [
(args[i1 - 1], parse_options(args[i0 : i1 - 1]))
for i0, i1 in zip((0, *opos[:-1]), opos)
]
return dict(global_options=gopts, inputs=inputs, outputs=outputs)
def compose(args, command="", shell_command=False):
"""compose ffmpeg subprocess arguments from argument dict values
:param global_options: global options, defaults to None
:type global_options: dict, optional
:param inputs: list of input files and their options, defaults to []
:type inputs: seq of seq(str, dict or None), optional
:param outputs: list of output files and thier options, defaults to []
:type outputs: seq of seq(str, dict or None), optional
:param command: ffmpeg command, defaults to ""
:type command: str, optional
:param shell_command: True to output shell command ready string, defaults to False
:type shell_command: bool, optional
:returns: list of arguments (possibly missing the leading 'ffmpeg' command if `command`
is not given) or shell command string if `shell_command` is True
:rtype: list of str or str
If global_options is None and only 1 output file given, FFmpeg global options
may be specified as additional output options.
"""
def finalize_global(key, val):
return key, val
def finalize_output(key, val):
if re.match(r"s(?:\:|$)", key) and not isinstance(val, str):
val = "x".join((str(v) for v in val))
elif key == "map" and not isinstance(val, str):
# if an entry is a seq, join with ':'
val = [
v if isinstance(v, str) else ":".join((str(vi) for vi in v))
for v in val
]
return key, val
def finalize_input(key, val):
if re.match(r"s(?:\:|$)", key) and not isinstance(val, str):
val = "x".join((str(v) for v in val))
return key, val
def opts2args(opts, finalize):
# FFmpeg applies the last of the repeated options regardless of overall/per-stream
# need to parse per-stream options and group repeated options together and
# apply the overall option first
opts_parsed = {}
for itm in opts.items():
k, v = finalize(*itm)
oname, *sspec = k.split(":", 1)
o = opts_parsed.get(oname, None)
if o is None:
opts_parsed[oname] = o = {}
o[sspec[0] if len(sspec) else None] = v
def set_arg(karg, val):
if not isinstance(val, (str, Graph, Chain, Filter)) and isinstance(
val, abc.Sequence
):
for v in val:
args.extend([karg, str(v)])
else:
args.append(karg)
if val is not FLAG:
args.append(str(val))
args = []
for key, vals in opts_parsed.items():
kbase = f"-{key}"
if None in vals:
val = val = vals.pop(None)
set_arg(kbase, val)
for st, val in vals.items():
set_arg(f"{kbase}:{st}", val)
return args
def inputs2args(inputs):
args = []
for url, opts in inputs:
# resolve url enumeration if it's a device
url, opts = devices.resolve_source(url, opts)
if opts:
args.extend(opts2args(opts, finalize_input))
args.extend(
[
"-i",
str(url) if url is not None else os.devnull,
]
)
return args
def outputs2args(outputs):
args = []
for url, opts in outputs:
# resolve url enumeration if it's a device
url, opts = devices.resolve_sink(url, opts)
if opts:
args.extend(opts2args(opts, finalize_output))
args.append(
str(url)
if url is not None
else "/dev/null" if os.name != "nt" else "NUL"
)
return args
args = [
*([command] if command else []),
*opts2args(args.get("global_options", None) or {}, finalize_global),
*inputs2args(args.get("inputs", None) or ()),
*outputs2args(args.get("outputs", None) or ()),
]
return form_shell_cmd(args) if shell_command else args