-
Notifications
You must be signed in to change notification settings - Fork 9
Expand file tree
/
Copy pathfiltergraph.py
More file actions
2699 lines (2235 loc) · 93.2 KB
/
filtergraph.py
File metadata and controls
2699 lines (2235 loc) · 93.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""ffmpegio.filtergraph module - FFmpeg filtergraph classes
Arithmetic Filtergraph Construction
===================================
.. list-table:: Supported Arithmetic Operators
:widths: 15 10 30
:header-rows: 1
--------------------------------- ------------------------------------------------------------
Operation Description Related Methods
------------------------------ ------------------------------------------------------------
`+` operator Chaining/join operator, supports scalar expansion
`Filter + Filter -> Chain` Create a filterchain from 2 filters
`Chain + Filter -> Chain` Append filter to filterchain
`Filter + Chain -> Chain` Prepend filter to filterchain
`Chain + Chain -> Chain` Concatenate filterchains
`Filter + Graph -> Graph` Prepend filer to first available input of each chain
`Graph + Filter -> Graph` Append filter to first available output of each chain
`Graph + Chain -> Graph` Append filterchain to first available input of each chain
`Chain + Graph -> Graph` Prepend filterchain to first available output of each chain
`Graph + Graph -> Graph` Join 2 graphs by matching their inputs and outputs in order
`*` operator Multiplicate-n-stacking operator
`Filter * int -> Graph` Stacking the filters (int) times
` Chain * int -> Graph` Stacking the chain (int) times
` Graph * int -> Graph` Stacking the input graph (int) times
`|` operator Stacking operator
`Filter | Filter -> Graph` Stacking the filters
` Chain | Filter -> Graph` Stacking chain and filter
`Filter | Chain -> Graph` Stacking filter and chain
` Chain | Chain -> Graph` Stacking the filterchains
`Filter | Graph -> Graph` Prepend filter as a new chain
` Graph | Filter -> Graph` Appendd filter as a new chain
` Graph | Chain -> Graph` Stack graph and chain
` Chain | Graph -> Graph` Stack
` Graph | Graph -> Graph` Stack filtergraphs
left `>>` operator Input labeling or attach input filter/chain
` str >> Filter -> Graph` Label first available input pad*
` str >> Chain -> Graph` Label first available input pad*
` str >> Graph -> Graph` Label first available chainable input pad*
` Filter >> Graph -> Graph` Attach filter output to first available input pad
` Chain >> Graph -> Graph` Adding Chain to itself int times
`(_,Index) >> Filter -> Graph` Specify input pad
`(_,Index) >> Chain -> Graph` Specify input pad of the first filter
`(_,Index) >> Graph -> Graph` Specify input pad
right `>>` operator Output labeling or attach output filter/chain
`Filter >> str -> Graph` Label first available output pad*
` Chain >> str -> Graph` Label first available output pad*
` Graph >> str -> Graph` Label first available chainable output pad*
` Graph >> Filter -> Graph` Attach filter to the first
` Graph >> Chain -> Graph` Adding Chain to itself int times
`Filter >> (Index,_) -> Graph` Specify output pad
` Chain >> (Index,_) -> Graph` Specify output pad
` Graph >> (Index,_) -> Graph` Specify output pad
------------------------------ ------------------------------------------------------------
Filter Pad Labeling
===================
`str >> Filter/Chain/Graph` and `Filter/Chain/Graph >> str` operations can be used to set input
and output labels, respectively. The labels must be specified in square brackets as in the same
manner as FFmpeg filtergraph specification.
.. code-block::python
fg = '[in]' >> Filter('scale',0.5,-1) >> '[out]'
The brackets are required to distinguish labels from str expressions of filter, chain, and graph.
For example, the following expression chains `scale` and `setsar` filters:
.. code-block::python
fg = '[in]' >> Filter('scale',0.5,-1) + 'setsar=1/1' >> '[out]'
Filter Pad Indexing
===================
Both input and output filter pads can be specified in a number of ways:
--------------------- -----------------------------------------------------------------------
Syntax Description
--------------------- -----------------------------------------------------------------------
int n Specifies the n-th pad of the first available filter
(int m, int n) Specifies the n-th pad of the m-th filter of the first available chain
(int k, int m, int n) Specifies the n-th pad of the m-th filter of the k-th chain
str label Specifies the pad associated with the link label (no bracket necessary)
--------------------- -----------------------------------------------------------------------
Except for the label indexing, which is a Graph specific feature, all the indexing syntax may be
used by `Filter`, `Chain`, or `Graph` class instances. An irrelevant field (e.g., chain or filter
indexing for a `Filter` instance) will be ignored. Standard negative-number indexing is supported.
"""
from collections import UserList, abc
from contextlib import contextmanager
from functools import partial, reduce
from copy import deepcopy
import itertools
from math import floor, log10
import os
import re
from subprocess import PIPE
from tempfile import NamedTemporaryFile
from . import path
from .caps import filters as list_filters, filter_info, layouts
from .utils import filter as filter_utils, is_stream_spec
from .utils.fglinks import GraphLinks
from .errors import FFmpegioError
class FilterOperatorTypeError(TypeError, FFmpegioError):
def __init__(self, other) -> None:
super().__init__(
f"invalid filtergraph operation with an incompatible object of type {type(other)}"
)
class FiltergraphMismatchError(TypeError, FFmpegioError):
def __init__(self, n, m) -> None:
super().__init__(
f"cannot append mismatched filtergraphs: the first has {n} input "
f"while the second has {m} outputs available."
)
class FiltergraphInvalidIndex(TypeError, FFmpegioError):
pass
def _check_joinable(src, dst):
n = src.get_num_outputs()
m = dst.get_num_inputs()
if not (n and m):
raise FiltergraphMismatchError(n, m)
return n == 1 and m == 1
def _is_label(expr):
return isinstance(expr, str) and re.match(r"\[[^\[\]]+\]$", expr)
class FiltergraphPadNotFoundError(FFmpegioError):
def __init__(self, type, index) -> None:
target = (
f"pad {index}"
if isinstance(index, tuple)
else f"label {index}"
if isinstance(index, str)
else f"filter {index}"
)
super().__init__(f"cannot find {type} pad at {target}")
def as_filter(filter_spec):
if isinstance(filter_spec, Graph):
if len(filter_spec) != 1 and len(filter_spec[0]) != 1:
raise FFmpegioError(
"Only a Graph object with a single one-element chain can be downconverted to Filter."
)
else:
return filter_spec[0, 0]
if isinstance(filter_spec, Chain):
if len(filter_spec) != 1:
raise FFmpegioError(
"Only a Chain object with a single element can be downconverted to Filter."
)
else:
return filter_spec[0][0]
return filter_spec if isinstance(filter_spec, Filter) else Filter(filter_spec)
def as_filterchain(filter_specs, copy=False):
if isinstance(filter_specs, Graph):
if len(filter_specs) != 1:
raise FFmpegioError(
"Only a Graph object with a single chain can be downconverted to Chain."
)
return Chain(filter_specs[0])
return (
filter_specs
if not copy and isinstance(filter_specs, Chain)
else Chain([filter_specs] if isinstance(filter_specs, Filter) else filter_specs)
)
def as_filtergraph(filter_specs, copy=False):
return (
filter_specs
if not copy and isinstance(filter_specs, Graph)
else Graph(filter_specs)
)
def as_filtergraph_object(filter_specs):
if isinstance(filter_specs, (Filter, Chain, Graph)):
return filter_specs
try:
assert isinstance(filter_specs, str)
specs, links, sws_flags = filter_utils.parse_graph(filter_specs)
n = len(specs)
if links or sws_flags or n > 1:
return Graph(specs, links, sws_flags)
specs = specs[0]
return Filter(specs[0]) if len(specs) == 1 else Chain(specs)
except:
try:
return as_filter(filter_specs)
except:
try:
return as_filterchain(filter_specs)
except:
return as_filtergraph(filter_specs)
def _shift_labels(obj, label_type, args):
if _is_label(args):
return obj.add_labels(label_type, args)
if all(_is_label(arg) for arg in args):
return obj.add_labels(label_type, args)
is_dst = label_type == "dst"
assert len(args) == 2 and _is_label(args[0 if is_dst else 1])
return obj.add_labels(
label_type, {obj._resolve_index(is_dst, args[is_dst]): args[not is_dst]}
)
###################################################################################################
# FILTER TOOLS
class Filter(tuple):
"""FFmpeg filter definition immutable class
:param filter_spec: _description_
:type filter_spec: _type_
:param filter_id: _description_, defaults to None
:type filter_id: _type_, optional
:param \\*opts: filter option values assigned in the order options are
declared
:type \\*opts: dict, optional
:param \\**kwopts: filter options in key=value pairs
:type \\**kwopts: dict, optional
"""
class Error(FFmpegioError):
pass
class InvalidName(Error):
def __init__(self, name):
super().__init__(
f"Filter {name} is not defined in FFmpeg (v{path.FFMPEG_VER}).\n"
)
class InvalidOption(Error):
pass
class Unsupported(Error):
def __init__(self, name, feature) -> None:
super().__init__(f"{feature} not yet supported feature for {name} filter.")
def __new__(self, filter_spec, *args, filter_id=None, **kwargs):
"""_summary_"""
proto = []
if isinstance(filter_spec, Filter):
if filter_spec.id and filter_id is not None: # new id
proto.append((filter_spec.name, filter_id))
proto.extend(filter_spec[1:])
else:
proto.extend(filter_spec)
else:
# parse if str given
if isinstance(filter_spec, str):
filter_spec = filter_utils.parse_filter(filter_spec)
if not (isinstance(filter_spec, abc.Sequence) and len(filter_spec)):
raise ValueError("filter_spec must be a non-empty sequence.")
name, *opts = filter_spec
if isinstance(name, str):
proto.append((name, id) if isinstance(id, str) else name)
elif not (
isinstance(name, abc.Sequence)
and len(name) != 2
and all((isinstance(i, str) for i in name))
):
raise ValueError(
"filter_spec[0] must be a str or 2-element str sequence."
)
else:
# name + id: re-id if id arg given
proto.append(tuple(name) if filter_id is None else (name[0], filter_id))
proto.extend(opts)
# create named options dict
proto_dict = proto.pop() if isinstance(proto[-1], dict) else {}
# change ordered options if non-None value is given
nord = len(proto) - 1 # # of ordered options
for i, o in enumerate(args[:nord]):
if o is not None:
proto[i] = o
# add additional ordered options if present
proto.extend(args[nord:])
# update named options
if len(kwargs):
proto_dict.update(kwargs)
# validate named option keys to be str
for k in proto_dict:
if not isinstance(k, str):
raise ValueError(
"All keys of the named option dict must be of type str."
)
# add the named option dict to the prototype list
if len(proto_dict):
proto.append(proto_dict)
# create the final tuple
return tuple.__new__(Filter, proto)
def _resolve_index(self, is_input, index):
try:
if isinstance(index, tuple):
assert len(index) in (1, 2, 3)
i = index[-1]
elif isinstance(index, int):
i = index
elif index is None:
i = -1 # pick the last input (chainable)
else:
assert False
n = self.get_num_inputs() if is_input else self.get_num_outputs()
if i < 0:
i = n + i
assert i >= 0 and i < n
return i
except:
raise FiltergraphPadNotFoundError("input" if is_input else "output", index)
def __getitem__(self, key):
value = super().__getitem__(key)
if isinstance(value, dict):
value = {**value}
if isinstance(value, tuple):
if isinstance(value[-1], dict):
value = tuple((*value[:-1], {**value[-1]}))
elif isinstance(value[0], dict):
value = tuple(({**value[-1]}, *value[1:]))
return value
def __str__(self):
return filter_utils.compose_filter(*self)
def __repr__(self):
type_ = type(self)
return f"""<{type_.__module__}.{type_.__qualname__} object at {hex(id(self))}>
FFmpeg expression: \"{str(self)}\"
Number of inputs: {self.get_num_inputs()}
Number of outputs: {self.get_num_outputs()}
"""
@property
def name(self):
name = self[0]
return name if isinstance(name, str) else name[0]
@property
def fullname(self):
name = self[0]
return name if isinstance(name, str) else f"{name[0]}@{name[1]}"
@property
def id(self):
name = self[0]
return None if isinstance(name, str) else name[1]
@property
def ordered_options(self):
opts = self[1:]
return opts[:-1] if isinstance(opts[-1], dict) else opts
@property
def named_options(self):
opts = self[-1]
return opts if isinstance(opts, dict) else {}
@property
def info(self):
try:
return filter_info(self.name)
except:
raise Filter.InvalidName(self.name)
def get_pad_media_type(self, port, pad_id):
try:
port = (
"inputs"
if "inputs".startswith(port)
else "outputs"
if "outputs".startswith(port)
else None
)
assert port is not None
except:
raise ValueError(
f"{port} is an invalid filter port type. Must be either 'input' or 'output'."
)
port_info = getattr(self.info, port)
if port_info is None:
# filters with homogeneous multiple in/out
# fmt:off
pure_video = {
"inputs": [
"bm3d", "decimate", "fieldmatch", "hstack", "interleave", "mergeplanes",
"mix", "premultiply", "signature", "streamselect", "unpremultiply",
"vstack", "xmedian", "xstack",
],
"outputs": [
"alphaextract", "extractplanes", "select", "split", "streamselect",
],
}
pure_audio = {
"inputs": [
"afir", "ainterleave", "amerge", "amix", "astreamselect", "headphone", "join", "ladspa",
],
"outputs": [
"acrossover", "aselect", "asplit", "astreamselect", "channelsplit",
],
}
# fmt:on
if self.name in pure_video[port]:
return "video"
if self.name in pure_audio[port]:
return "audio"
if self.name == "concat":
n = self.get_option_value("n")
v = self.get_option_value("v")
a = self.get_option_value("a")
return (
("video" if pad_id % n < v else "audio")
if port != "outputs"
else ("video" if pad_id < v else "audio")
)
# multiple pads possible if streams option set
if self.name in ("movie", "amovie"):
if self.get_option_value("streams") is None:
return "video" if self.name == "movie" else "audio"
# 2nd pad for audio visualization stream
vis_mode = ["afir", "aiir", "anequalizer", "ebur128", "aphasemeter"]
if port == "outputs" and self.name in vis_mode:
return "video" if pad_id else "audio"
raise Filter.Unsupported(self.name, "dynamic media type resolution")
try:
pad_info = port_info[pad_id]
return pad_info["type"]
except:
raise ValueError(
f"{pad_id} is an invalid pad_id as an {port[:-1]} pad of {self.name} filter."
)
def get_option_value(self, option_name):
# first check the named options as-is
named_opts = self.named_options
try:
return named_opts[option_name]
except:
pass
# get the option info
i, opt_info = next(
(
(i, o)
for i, o in enumerate(self.info.options)
if o.name == option_name or option_name in o.aliases
),
(None, None),
)
if i is None:
raise Filter.InvalidOption(
f"Invalid option name ({option_name}) for {self.name} filter"
)
try:
# try full name first
return named_opts[opt_info.name]
except:
# try alias name next
for a in opt_info.aliases:
try:
return named_opts[a]
except:
pass
# try from ordered options next
try:
return self.ordered_options[i]
except:
# if nothing fits, use the default value (maybe undefined/None)
return opt_info.default
def get_num_inputs(self):
"""get the number of input pads of the filter
:return: number of input pads
:rtype: int
"""
name = self.name
if not isinstance(name, str):
# name@id
name = name[0]
try:
nin = list_filters()[name].num_inputs
except:
raise Filter.InvalidName(name)
if nin is not None: # fixed number
return nin
def _inplace():
return 1 if self.get_option_value("inplace") else 2
def _headphone():
if self.get_option_value("hrir") == "multich":
return 2
map = self.get_option_value("map")
return (
len(re.split(r"\s*\|\s*", map)) + 1
if isinstance(map, str)
else len(map) + 1
)
def _mergeplanes():
map = self.get_option_value("mapping")
if not isinstance(map, int):
map = int(map, 16 if map.startswith("0x") else 10)
return int(max(f"{map:08x}"[::2])) + 1
def _concat():
return self.get_option_value("n") * (
self.get_option_value("v") + self.get_option_value("a")
)
option_name, inc = {
"afir": ("nbirs", 1),
"concat": (None, _concat),
"decimate": ("ppsrc", 1),
"fieldmatch": ("ppsrc", 1),
"headphone": (None, _headphone),
"interleave": ("nb_inputs", 0),
"limitdiff": ("reference", 1),
"mergeplanes": (None, _mergeplanes),
"premultiply": (None, _inplace),
"unpremultiply": (None, _inplace),
"signature": ("nb_inputs", 0),
# "astreamselect": ("inputs", 0),
# "bm3d": ("inputs", 0),
# "hstack": ("inputs", 0),
# "mix": ("inputs", 0),
# "streamselect": ("inputs", 0),
# "vstack": ("inputs", 0),
# "xmedian": ("inputs", 0),
# "xstack": ("inputs", 0),
}.get(name, ("inputs", 0))
return (
int(self.get_option_value(option_name)) + inc
if isinstance(option_name, str)
else inc()
)
def get_num_outputs(self):
"""get the number of output pads of the filter
:return: number of output pads
:rtype: int
"""
name = self.name
try:
nout = list_filters()[name].num_outputs
except:
raise Filter.InvalidName(name)
if nout is not None: # arbitrary number allowed
return nout
def _concat():
return int(self.get_option_value("a")) + int(self.get_option_value("v"))
def _list_var(opt, sep, inc):
v = self.get_option_value(opt)
return (
len(v)
if sep == r"\|" and not isinstance(v, str)
else len(re.split(rf"\s*{sep}\s*", v))
) + inc
def _channelsplit():
layout = self.get_option_value("channel_layout")
channels = self.get_option_value("channels")
return len(
re.split(
rf"\s*\+\s*",
layouts()["layouts"][layout] if channels == "all" else channels,
)
)
# fmt:off
option_name, inc = {
"afir": ("response", 1), # +video stream
"aiir": ("response", 1), # +video stream
"anequalizer": ("curves", 1),
"ebur128": ("video", 1),
"aphasemeter": ("video", 1),
"acrossover": ('split',partial( _list_var,"split", " ", 1)), # split option (space-separated)
"asegment": ("timestamps", partial( _list_var,"timestamps", r"\|", 1)),
"segment": ("timestamps", partial( _list_var,"timestamps", r"\|", 1)),
"astreamselect": ("map", partial( _list_var,"map", " ", 0)), # parse map?
"streamselect": ("map", partial( _list_var,"map", " ", 0)), # parse map?
"extractplanes": ("planes", partial( _list_var,"planes", r"\+", 0)), # parse planes
"amovie": ("streams",partial( _list_var,"streams", r"\+", 0)),
"movie": ("streams",partial( _list_var,"streams", r"\+", 0)),
"channelsplit": (('channel_layout', 'channels'),_channelsplit), # parse channel_layout/channels
"concat": (('a', 'v'), _concat), # sum a and v
# "aselect": (("output", "n"), 0), # must resolve alias...
# "asplit": ("outputs", 0),
# "select": (("output", "n"), 0),
# "split": ("outputs", 0),
}.get(name, ("outputs", 0))
# fmt:on
return (
int(self.get_option_value(option_name)) + inc
if isinstance(inc, int)
else inc()
)
def add_labels(self, pad_type, labels):
"""turn into filtergraph and add labels
:param pad_type: filter pad type
:type pad_type: 'dst'|'src'
:param labels: pad label(s) and optionally pad id
:type labels: str|seq(str)|dict(int:str), optional
"""
fg = Graph([[self]])
if labels is not None:
if isinstance(labels, str):
fg.add_label(labels, **{pad_type: (0, 0, 0)})
elif isinstance(labels, dict):
for pad, label in labels.items():
fg.add_label(label, **{pad_type: fg._resolve_index(pad)})
else:
for pad, label in enumerate(labels):
fg.add_label(label, **{pad_type: (0, 0, pad)})
return fg
def apply(self, options, filter_id=None):
"""apply new filter options
:param options: new option key-value pairs. For ordered option, use positional index (0
corresponds to the first option). Set value as None to drop the option.
Ordered options can only be dropped in contiguous fashion, including the
last ordered option.
:type options: dict
:param filter_id: new filter id, defaults to None
:type filter_id: str, optional
:return: new filter with modified options
:rtype: Filter
.. note::
To add new ordered options, int-keyed options item must be presented in
the increasing key order so the option can be expanded one at a time.
"""
try:
assert isinstance(self[-1], dict)
kwopts = dict(self[-1])
try:
opts = list(self[1:-1])
except:
opts = []
except:
kwopts = {}
try:
opts = list(self[1:])
except:
opts = []
nopts = len(opts)
delopts = set()
for k, v in options.items():
if type(k) == int:
if k < 0 or k > nopts:
raise Filter.Error(f"invalid positional index [{k}]")
if v is not None:
if k < nopts:
opts[k] = v
else:
opts = [*opts, v]
nopts += 1
elif k < 0 or k > nopts:
delopts.add(k)
else:
if v is None:
del kwopts[k]
else:
kwopts[k] = v
if len(delopts):
delopts = sorted(list(delopts))
o1 = delopts[0] - 1
on = delopts[-1]
if on != nopts or len(delopts) != on - o1:
raise Filter.Error(
f"cannot drop specified ordered options {delopts}. They must be contiguous and include the last ordered option."
)
opts = opts[:o1]
return Filter(self[0], *opts, filter_id=filter_id, **kwopts)
def __add__(self, other):
# join
try:
other = as_filter(other)
except Exception:
return NotImplemented
if _check_joinable(self, other):
# one-to-one -> chain
return Chain([self, other])
else:
# one-to-many or many-to-one -> stack and link
return Graph([[self], [other]], {0: ((1, 0, 0), (0, 0, 0))})
def __radd__(self, other):
# join
try:
other = as_filter(other)
except Exception:
return NotImplemented
if _check_joinable(other, self):
# one-to-one -> chain
return Chain([other, self])
else:
# one-to-many or many-to-one -> stack and link
return Graph([[other], [self]], {0: ((1, 0, 0), (0, 0, 0))})
def __mul__(self, __n):
return Graph([[self]] * __n) if isinstance(__n, int) else NotImplemented
def __rmul__(self, __n):
return Graph([[self]] * __n) if isinstance(__n, int) else NotImplemented
def __or__(self, other):
# stack
try:
other = as_filter(other)
except:
return NotImplemented
return Graph([[self], [other]])
def __ror__(self, other):
# stack
if isinstance(other, int):
return Graph([[self]] * other)
try:
other = as_filter(other)
except:
return NotImplemented
return Graph([[other], [self]])
def __rshift__(self, other):
"""self >> other | self >> (index, other)"""
# try labeling first
try:
return _shift_labels(self, "src", other)
except FFmpegioError:
raise
except:
pass
# resolve the index
if type(other) == tuple:
if len(other) > 2:
index, other_index, other = other
else:
index, other = other
other_index = None
else:
index = None
other_index = None
index = self._resolve_index(False, index)
# if other is Filter object, do add operation
try:
other = as_filtergraph_object(other)
except:
return NotImplemented
# if not Chain or Graph, use other's >> operator
if not isinstance(other, Filter):
return other.__rrshift__((self, index, other_index))
if other.get_num_inputs() == 0:
raise FiltergraphMismatchError(self.get_num_outputs(), 0)
# equivalent to add operation or stack and link
return (
self.__add__(other)
if index + 1 == self.get_num_outputs()
else Graph([[self], [other]], {0: ((1, 0, 0), (0, 0, index))})
)
def __rrshift__(self, other):
"""other >> self, (other, index) >> self : attach input label or filter"""
# try to label first
try:
return _shift_labels(self, "dst", other)
except FFmpegioError:
raise
except:
pass
# resolve the index
if type(other) == tuple:
if len(other) > 2:
other, other_index, index = other
else:
other, index = other
other_index = None
else:
index = None
other_index = None
index = self._resolve_index(True, index)
# if label
if _is_label(other):
if other_index is None:
return self.add_labels("dst", {index: other})
else:
raise FiltergraphInvalidIndex("index cannot be assigned to a label")
# if other is Filter object, do add operation
try:
other = as_filtergraph_object(other)
except:
return NotImplemented
# if not Chain or Graph, use other's >> operator
if not isinstance(other, Filter):
return other.__rshift__((other_index, index, self))
if other.get_num_outputs() == 0:
raise FiltergraphMismatchError(0, self.get_num_inputs())
if not index:
# equivalent to chain/add operation
return self.__radd__(other)
else:
# stack and link
return Graph([[other], [self]], {0: ((1, 0, index), (0, 0, 0))})
####################################################################################
class Chain(UserList):
"""List of FFmpeg filters, connected in series
Chain() to instantiate empty Graph object
Chain(obj) to copy-instantiate Graph object from another
Chain('...') to parse an FFmpeg filtergraph expression
:param filter_specs: single-in-single-out filtergraph description without
labels, defaults to None
:type filter_specs: str or seq(Filter), optional
"""
class Error(FFmpegioError):
pass
def __init__(self, filter_specs=None):
# convert str to a list of filter_specs
if isinstance(filter_specs, str):
filter_specs, links, sws_flags = filter_utils.parse_graph(filter_specs)
if links:
raise ValueError(
"filter_specs with link labels cannot be represented by the Chain class. Use Graph."
)
if sws_flags:
raise ValueError(
"filter_specs with sws_flags cannot be represented by the Chain class. Use Graph."
)
if len(filter_specs) != 1:
raise ValueError(
"filter_specs str must resolve to a single-chain filtergraph. Use the Graph class instead."
)
filter_specs = filter_specs[0]
elif isinstance(filter_specs, Filter):
filter_specs = [filter_specs]
super().__init__(
() if filter_specs is None else (as_filter(fspec) for fspec in filter_specs)
)
def _resolve_index(self, is_input, index):
try:
if isinstance(index, tuple):
assert len(index) in (1, 2, 3)
i = index[-1]
try:
j = index[-2]
except:
j = None
else:
j = None
i = index if isinstance(index, int) else None
return next(
(self.iter_input_pads if is_input else self.iter_output_pads)(
filter=j, pad=i
)
)[:2]
except:
raise FiltergraphPadNotFoundError("input" if is_input else "output", index)
def __str__(self):
return filter_utils.compose_graph([self.data])
def __repr__(self):
type_ = type(self)
return f"""<{type_.__module__}.{type_.__qualname__} object at {hex(id(self))}>
FFmpeg expression: \"{str(self)}\"
Number of filters: {len(self.data)}
Input pads ({self.get_num_inputs()}): {', '.join((str(id[:-1]) for id in self.iter_input_pads()))}
Output pads: ({self.get_num_outputs()}): {', '.join((str(id[:-1]) for id in self.iter_output_pads()))}
"""
def __setitem__(self, key, value):
super().__setitem__(key, as_filter(value))
def append(self, item):
return super().append(as_filter(item))
def extend(self, other):
return super().extend([as_filter(f) for f in other])
def insert(self, i, item):
return super().insert(i, as_filter(item))
def __contains__(self, item):
item = as_filter(item)
return any((f.name == item for f in self.data))
def __mul__(self, __n):
res = super().__mul__(__n)
_check_joinable(self, self)
return res
def __rmul__(self, __n):
return self.__mul__(__n)
def __add__(self, other):
# chain
try:
other = as_filterchain(other)
except Exception:
return NotImplemented