-
Notifications
You must be signed in to change notification settings - Fork 9
Expand file tree
/
Copy pathFilter.py
More file actions
873 lines (736 loc) · 30.7 KB
/
Filter.py
File metadata and controls
873 lines (736 loc) · 30.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
from __future__ import annotations
from collections.abc import Generator, Sequence
import re
from functools import partial
from itertools import chain
from ..caps import filters as list_filters, filter_info, layouts, FilterInfo
from ..utils import filter as filter_utils
from .. import filtergraph as fgb
from .typing import PAD_INDEX
from .exceptions import *
__all__ = ["Filter"]
class Filter(fgb.abc.FilterGraphObject, tuple):
"""FFmpeg filter definition immutable class
:param filter_spec: _description_
:type filter_spec: _type_
:param filter_id: _description_, defaults to None
:type filter_id: _type_, optional
:param \\*opts: filter option values assigned in the order options are
declared
:type \\*opts: dict, optional
:param \\**kwopts: filter options in key=value pairs
:type \\**kwopts: dict, optional
"""
class Error(FFmpegioError):
pass
class InvalidName(Error):
def __init__(self, name):
from .. import path
super().__init__(
f"Filter {name} is not defined in FFmpeg (v{path.FFMPEG_VER}).\n"
)
class InvalidOption(Error):
pass
class Unsupported(Error):
def __init__(self, name, feature) -> None:
super().__init__(f"{feature} not yet supported feature for {name} filter.")
_info: dict[str, FilterInfo] = {}
@staticmethod
def _get_info(name: str) -> FilterInfo:
try:
info = Filter._info[name]
except KeyError:
try:
info = Filter._info[name] = list_filters()[name]
except:
raise Filter.InvalidName(name)
return info
def __new__(self, filter_spec, *args, filter_id=None, **kwargs):
"""_summary_"""
proto = []
if isinstance(filter_spec, Filter):
if filter_spec.id and filter_id is not None: # new id
proto.append((filter_spec.name, filter_id))
proto.extend(filter_spec[1:])
else:
proto.extend(filter_spec)
else:
# parse if str given
if isinstance(filter_spec, str):
filter_spec = filter_utils.parse_filter(filter_spec)
if not (isinstance(filter_spec, Sequence) and len(filter_spec)):
raise ValueError("filter_spec must be a non-empty sequence.")
name, *opts = filter_spec
if isinstance(name, str):
self._get_info(name)
proto.append((name, id) if isinstance(id, str) else name)
elif not (
isinstance(name, Sequence)
and len(name) != 2
and all((isinstance(i, str) for i in name))
):
raise ValueError(
"filter_spec[0] must be a str or 2-element str sequence."
)
else:
# name + id: re-id if id arg given
self._get_info(name[0])
proto.append(tuple(name) if filter_id is None else (name[0], filter_id))
proto.extend(opts)
# create named options dict
proto_dict = proto.pop() if isinstance(proto[-1], dict) else {}
# change ordered options if non-None value is given
nord = len(proto) - 1 # # of ordered options
for i, o in enumerate(args[:nord]):
if o is not None:
proto[i] = o
# add additional ordered options if present
proto.extend(args[nord:])
# update named options
if len(kwargs):
proto_dict.update(kwargs)
# validate named option keys to be str
for k in proto_dict:
if not isinstance(k, str):
raise ValueError(
"All keys of the named option dict must be of type str."
)
# add the named option dict to the prototype list
if len(proto_dict):
proto.append(proto_dict)
# create the final tuple
return tuple.__new__(Filter, proto)
def __getitem__(self, key):
value = tuple.__getitem__(self, key)
if isinstance(value, dict):
value = {**value}
if isinstance(value, tuple):
if isinstance(value[-1], dict):
value = tuple((*value[:-1], {**value[-1]}))
elif isinstance(value[0], dict):
value = tuple(({**value[-1]}, *value[1:]))
return value
def compose(
self,
show_unconnected_inputs: bool = False,
show_unconnected_outputs: bool = False,
):
"""compose filtergraph
:param show_unconnected_inputs: display [UNC#] on all unconnected input pads, defaults to True
:param show_unconnected_outputs: display [UNC#] on all unconnected output pads, defaults to True
"""
return (
fgb.Graph(self.data).compose(
show_unconnected_inputs, show_unconnected_outputs
)
if show_unconnected_inputs or show_unconnected_outputs
else filter_utils.compose_filter(*self)
)
def __repr__(self):
type_ = type(self)
return f"""<{type_.__module__}.{type_.__qualname__} object at {hex(id(self))}>
FFmpeg expression: \"{self.compose(True,True)}\"
Number of inputs: {self.get_num_inputs()}
Number of outputs: {self.get_num_outputs()}
"""
@property
def name(self):
name = self[0]
return name if isinstance(name, str) else name[0]
@property
def fullname(self):
name = self[0]
return name if isinstance(name, str) else f"{name[0]}@{name[1]}"
@property
def id(self):
name = self[0]
return None if isinstance(name, str) else name[1]
@property
def ordered_options(self):
opts = self[1:]
return opts[:-1] if isinstance(opts[-1], dict) else opts
@property
def named_options(self):
opts = self[-1]
return opts if isinstance(opts, dict) else {}
@property
def info(self):
try:
return filter_info(self.name)
except:
raise Filter.InvalidName(self.name)
def get_pad_media_type(self, port, pad_id):
try:
port = (
"inputs"
if "inputs".startswith(port)
else "outputs" if "outputs".startswith(port) else None
)
assert port is not None
except:
raise ValueError(
f"{port} is an invalid filter port type. Must be either 'input' or 'output'."
)
port_info = getattr(self.info, port)
if port_info is None:
# filters with homogeneous multiple in/out
# fmt:off
pure_video = {
"inputs": [
"bm3d", "decimate", "fieldmatch", "hstack", "interleave", "mergeplanes",
"mix", "premultiply", "signature", "streamselect", "unpremultiply",
"vstack", "xmedian", "xstack",
],
"outputs": [
"alphaextract", "extractplanes", "select", "split", "streamselect",
],
}
pure_audio = {
"inputs": [
"afir", "ainterleave", "amerge", "amix", "astreamselect", "headphone", "join", "ladspa",
],
"outputs": [
"acrossover", "aselect", "asplit", "astreamselect", "channelsplit",
],
}
# fmt:on
if self.name in pure_video[port]:
return "video"
if self.name in pure_audio[port]:
return "audio"
if self.name == "concat":
n = self.get_option_value("n")
v = self.get_option_value("v")
a = self.get_option_value("a")
return (
("video" if pad_id % n < v else "audio")
if port != "outputs"
else ("video" if pad_id < v else "audio")
)
# multiple pads possible if streams option set
if self.name in ("movie", "amovie"):
if self.get_option_value("streams") is None:
return "video" if self.name == "movie" else "audio"
# 2nd pad for audio visualization stream
vis_mode = ["afir", "aiir", "anequalizer", "ebur128", "aphasemeter"]
if port == "outputs" and self.name in vis_mode:
return "video" if pad_id else "audio"
raise Filter.Unsupported(self.name, "dynamic media type resolution")
try:
pad_info = port_info[pad_id]
return pad_info["type"]
except:
raise ValueError(
f"{pad_id} is an invalid pad_id as an {port[:-1]} pad of {self.name} filter."
)
def get_option_value(self, option_name):
# first check the named options as-is
named_opts = self.named_options
try:
return named_opts[option_name]
except:
pass
# get the option info
i, opt_info = next(
(
(i, o)
for i, o in enumerate(self.info.options)
if o.name == option_name or option_name in o.aliases
),
(None, None),
)
if i is None:
raise Filter.InvalidOption(
f"Invalid option name ({option_name}) for {self.name} filter"
)
try:
# try full name first
return named_opts[opt_info.name]
except:
# try alias name next
for a in opt_info.aliases:
try:
return named_opts[a]
except:
pass
# try from ordered options next
try:
return self.ordered_options[i]
except:
# if nothing fits, use the default value (maybe undefined/None)
return opt_info.default
def get_num_inputs(self):
"""get the number of input pads of the filter
:return: number of input pads
:rtype: int
"""
name = self.name
if not isinstance(name, str):
# name@id
name = name[0]
try:
nin = self._info[name].num_inputs
except:
raise Filter.InvalidName(name)
if nin is not None: # fixed number
return nin
def _inplace():
return 1 if self.get_option_value("inplace") else 2
def _headphone():
if self.get_option_value("hrir") == "multich":
return 2
map = self.get_option_value("map")
return (
len(re.split(r"\s*\|\s*", map)) + 1
if isinstance(map, str)
else len(map) + 1
)
def _mergeplanes():
map = self.get_option_value("mapping")
if not isinstance(map, int):
map = int(map, 16 if map.startswith("0x") else 10)
return int(max(f"{map:08x}"[::2])) + 1
def _concat():
return self.get_option_value("n") * (
self.get_option_value("v") + self.get_option_value("a")
)
option_name, inc = {
"afir": ("nbirs", 1),
"concat": (None, _concat),
"decimate": ("ppsrc", 1),
"fieldmatch": ("ppsrc", 1),
"headphone": (None, _headphone),
"interleave": ("nb_inputs", 0),
"limitdiff": ("reference", 1),
"mergeplanes": (None, _mergeplanes),
"premultiply": (None, _inplace),
"unpremultiply": (None, _inplace),
"signature": ("nb_inputs", 0),
# "astreamselect": ("inputs", 0),
# "bm3d": ("inputs", 0),
# "hstack": ("inputs", 0),
# "mix": ("inputs", 0),
# "streamselect": ("inputs", 0),
# "vstack": ("inputs", 0),
# "xmedian": ("inputs", 0),
# "xstack": ("inputs", 0),
}.get(name, ("inputs", 0))
return (
int(self.get_option_value(option_name)) + inc
if isinstance(option_name, str)
else inc()
)
def get_num_outputs(self):
"""get the number of output pads of the filter
:return: number of output pads
:rtype: int
"""
name = self.name
try:
nout = self._info[name].num_outputs
except:
raise Filter.InvalidName(name)
if nout is not None: # arbitrary number allowed
return nout
def _concat():
return int(self.get_option_value("a")) + int(self.get_option_value("v"))
def _list_var(opt, sep, inc):
v = self.get_option_value(opt)
return (
len(v)
if sep == r"\|" and not isinstance(v, str)
else len(re.split(rf"\s*{sep}\s*", v))
) + inc
def _channelsplit():
layout = self.get_option_value("channel_layout")
channels = self.get_option_value("channels")
return len(
re.split(
rf"\s*\+\s*",
layouts()["layouts"][layout] if channels == "all" else channels,
)
)
# fmt:off
option_name, inc = {
"afir": ("response", 1), # +video stream
"aiir": ("response", 1), # +video stream
"anequalizer": ("curves", 1),
"ebur128": ("video", 1),
"aphasemeter": ("video", 1),
"acrossover": ('split',partial( _list_var,"split", " ", 1)), # split option (space-separated)
"asegment": ("timestamps", partial( _list_var,"timestamps", r"\|", 1)),
"segment": ("timestamps", partial( _list_var,"timestamps", r"\|", 1)),
"astreamselect": ("map", partial( _list_var,"map", " ", 0)), # parse map?
"streamselect": ("map", partial( _list_var,"map", " ", 0)), # parse map?
"extractplanes": ("planes", partial( _list_var,"planes", r"\+", 0)), # parse planes
"amovie": ("streams",partial( _list_var,"streams", r"\+", 0)),
"movie": ("streams",partial( _list_var,"streams", r"\+", 0)),
"channelsplit": (('channel_layout', 'channels'),_channelsplit), # parse channel_layout/channels
"concat": (('a', 'v'), _concat), # sum a and v
# "aselect": (("output", "n"), 0), # must resolve alias...
# "asplit": ("outputs", 0),
# "select": (("output", "n"), 0),
# "split": ("outputs", 0),
}.get(name, ("outputs", 0))
# fmt:on
return (
int(self.get_option_value(option_name)) + inc
if isinstance(inc, int)
else inc()
)
def get_num_filters(self, chain: int) -> int:
"""get the number of filters of the specfied chain
:param chain: id of the chain
"""
if chain:
raise ValueError(f"{chain=} is invalid. Filter object only has 1 chain.")
return 1
def get_num_chains(self) -> int:
"""get the number of chains"""
return 1
def add_label(
self,
label: str,
inpad: PAD_INDEX | Sequence[PAD_INDEX] = None,
outpad: PAD_INDEX = None,
force: bool = None,
) -> fgb.Graph:
"""label a filter pad
:param label: name of the new label. Square brackets are optional.
:param inpad: input filter pad index or a sequence of pads, defaults to None
:param outpad: output filter pad index, defaults to None
:param force: True to delete existing labels, defaults to None
:return: actual label name
Only one of inpad and outpad argument must be given.
If given label already exists, no new label will be created.
If inpad indices are given, the label must be an input stream specifier.
If label has a trailing number, the number will be dropped and replaced with an
internally assigned label number.
"""
# must convert to FilterGraph as it's the only object with labels
fg = fgb.Graph([[self]])
return fg.add_label(label, inpad, outpad, force)
def _iter_pads(
self,
n: int,
pad: int | None,
filter: Literal[0] | None,
chain: Literal[0] | None,
exclude_chainable: bool,
chainable_first: bool,
chainable_only: bool,
) -> Generator[tuple[PAD_INDEX, Filter]]:
"""Iterate over input pads of the filter
:param n: number of pads
:param pad: pad id
:param filter: filter index
:param chain: chain index
:param exclude_chainable: True to leave out the last pads
:param chainable_first: True to yield the last pad first then the rest
:yield: filter pad index, link label, filter object
"""
if not n:
# takes no input, nothing to iterate
return
if (isinstance(filter, int) and filter != 0) or (
isinstance(chain, int) and chain != 0
):
# Filter alone can have no connections so yields no pad
raise FiltergraphInvalidIndex(f"Invalid {filter=} or {chain=} id")
if pad is not None:
if pad < 0: # resolve negative pad index
pad += n
if pad < 0 or pad >= (n - 1 if exclude_chainable else n):
raise FiltergraphInvalidIndex(f"Invalid {pad=} id")
if chainable_only:
if pad is None:
pad = n - 1
if pad != n - 1:
raise FiltergraphInvalidIndex(f"Invalid {pad=} is not chainable pad")
if not exclude_chainable and pad == n - 1:
yield (pad,), self, None
elif pad is None:
if chainable_first or exclude_chainable:
n = n - 1
if chainable_first and not exclude_chainable:
yield (n,), self, None
for j in range(n):
yield (j,), self, None
else:
yield (pad,), self, None
def iter_input_pads(
self,
pad: int | None = None,
filter: Literal[0] | None = None,
chain: Literal[0] | None = None,
*,
exclude_chainable: bool = False,
chainable_first: bool = False,
include_connected: bool = False,
unlabeled_only: bool = False,
chainable_only: bool = False,
full_pad_index: bool = False,
) -> Generator[tuple[PAD_INDEX, Filter, None]]:
"""Iterate over input pads of the filter
:param pad: pad id, defaults to None
:param filter: filter index, defaults to None
:param chain: chain index, defaults to None
:param exclude_chainable: True to leave out the last input pads, defaults to False (all avail pads)
:param chainable_first: True to yield the last input first then the rest, defaults to False
:param include_connected: True to include pads connected to input streams, defaults to False
:param unlabeled_only: True to leave out named inputs, defaults to False to return all inputs
:param chainable_only: True to only iterate chainable pads, defaults to False to return all inputs
:param full_pad_index: True to return 3-element index
:yield: filter pad index, link label, filter object, output pad index of connected filter if connected
"""
for index, filter, other_index in self._iter_pads(
self.get_num_inputs(),
pad,
filter,
chain,
exclude_chainable,
chainable_first,
chainable_only,
):
yield (
((0, 0, *index), filter, other_index)
if full_pad_index
else (index, filter, other_index)
)
def iter_output_pads(
self,
pad: int | None = None,
filter: Literal[0] | None = None,
chain: Literal[0] | None = None,
*,
exclude_chainable: bool = False,
chainable_first: bool = False,
include_connected: bool = False,
unlabeled_only: bool = False,
chainable_only: bool = False,
full_pad_index: bool = False,
) -> Generator[tuple[PAD_INDEX, Filter, PAD_INDEX | None]]:
"""Iterate over output pads of the filter
:param pad: pad id, defaults to None
:param filter: filter index, defaults to None
:param chain: chain index, defaults to None
:param exclude_chainable: True to leave out the last output pads, defaults to False (all avail pads)
:param chainable_first: True to yield the last output first then the rest, defaults to False
:param include_connected: True to include pads connected to output streams, defaults to False
:param unlabeled_only: True to leave out named outputs, defaults to False to return only all outputs
:param chainable_only: True to only iterate chainable pads, defaults to False to return all outputs
:param full_pad_index: True to return 3-element index
:yield: filter pad index, link label, filter object, output pad index of connected filter if connected
"""
for index, filter, other_index in self._iter_pads(
self.get_num_outputs(),
pad,
filter,
chain,
exclude_chainable,
chainable_first,
chainable_only,
):
yield (
((0, 0, *index), filter, other_index)
if full_pad_index
else (index, filter, other_index)
)
def iter_chains(
self,
skip_if_no_input: bool = False,
skip_if_no_output: bool = False,
chainable_only: bool = False,
) -> Generator[tuple[int, fgb.Chain]]:
"""iterate over chains of the filtergraphobject
:param skip_if_no_input: True to skip chains without available input pads, defaults to False
:param skip_if_no_output: True to skip chains without available output pads, defaults to False
:param chainable_only: True to further restrict ``skip_if_no_input`` and ``skip_if_no_input``
arguments to require chainable input or output, defaults to False to
allow any input/output
:yield: chain id and chain object
"""
if (not skip_if_no_input or self.get_num_inputs()) and (
not skip_if_no_output or self.get_num_outputs()
):
yield (0, fgb.Chain([self]))
def _connect(
self,
right: fgb.abc.FilterGraphObject,
fwd_links: list[tuple[PAD_INDEX, PAD_INDEX]],
bwd_links: list[tuple[PAD_INDEX, PAD_INDEX]],
chain_siso: bool = True,
replace_sws_flags: bool | None = None,
) -> fgb.Graph:
"""combine another filtergraph object and make downstream connections (worker)
:param right: other filtergraph
:param fwd_links: a list of tuples, pairing self's output pad and right's ipnut pad
:param bwd_links: a list of tuples, pairing right's output pad and self's ipnut pad
:param to_right: input pad ids or labels of the `right` fg
:param chain_siso: True to chain the single-input single-output connection, default: True
:param replace_sws_flags: True to use `right` sws_flags if present,
False to drop `right` sws_flags,
None to throw an exception (default)
:return: new filtergraph object
* link labels may be auto-renamed if there is a conflict
"""
if not isinstance(right, fgb.Filter):
# right is more complex filtergraph object
return right._rconnect(
self, fwd_links, bwd_links, chain_siso, replace_sws_flags
)
if chain_siso and self.get_num_outputs() == 1 and right.get_num_inputs() == 1:
return fgb.Chain([self, right])
# create iterators to organize the links in (input, output) of the combined graph
it_fwd = (((1, 0, r[2]), l) for (l, r) in fwd_links)
it_bwd = ((l, (1, 0, r[2])) for (r, l) in bwd_links)
return fgb.Graph(
[[self], [right]],
{i: link for i, link in enumerate(chain(it_fwd, it_bwd))},
)
def _rconnect(
self,
left: fgb.abc.FilterGraphObject,
fwd_links: list[tuple[PAD_INDEX, PAD_INDEX]],
bwd_links: list[tuple[PAD_INDEX, PAD_INDEX]],
chain_siso: bool = True,
replace_sws_flags: bool | None = None,
) -> fgb.Graph:
"""combine another filtergraph object and make upstream connections (worker)
:param right: other filtergraph
:param fwd_links: a list of tuples, pairing left's output pad and self's ipnut pad
:param bwd_links: a list of tuples, pairing self's output pad and left's ipnut pad
:param chain_siso: True to chain the single-input single-output connection, default: True
:param replace_sws_flags: True to use `right` sws_flags if present,
False to drop `right` sws_flags,
None to throw an exception (default)
:return: new filtergraph object
* link labels may be auto-renamed if there is a conflict
"""
if not isinstance(left, fgb.Filter):
# left is more complex filtergraph object
return left._connect(
self, fwd_links, bwd_links, chain_siso, replace_sws_flags
)
if chain_siso and left.get_num_outputs() == 1 and self.get_num_inputs() == 1:
return fgb.Chain([left, self])
# create iterators to organize the links in (input, output) of the combined graph
it_fwd = (((1, 0, r[2]), l) for (l, r) in fwd_links)
it_bwd = ((l, (1, 0, r[2])) for (r, l) in bwd_links)
return fgb.Graph(
[[left], [self]],
{i: link for i, link in enumerate(chain(it_fwd, it_bwd))},
)
def _stack(
self,
other: fgb.abc.FilterGraphObject,
auto_link: bool = False,
replace_sws_flags: bool | None = None,
) -> fgb.Graph:
"""stack another Graph to this Graph
:param other: other filtergraph
:param auto_link: True to connect matched I/O labels, defaults to None
:param replace_sws_flags: True to use other's sws_flags if present,
False to ignore other's sws_flags,
None to throw an exception (default)
:return: new filtergraph object
Remarks
-------
- extend() and import links
- If `auto-link=False`, common labels may be renamed.
- For more explicit linking rather than the auto-linking, use `connect()` instead.
TO-CHECK/TO-DO: what happens if common link labels are already linked
"""
other = fgb.as_filtergraph_object(other)
# if other is not a filter, elevate self to match first
return (
fgb.Graph([[self], [other]])
if isinstance(other, fgb.Filter)
else fgb.as_filtergraph_object_like(self, other)._stack(
other, auto_link, replace_sws_flags
)
)
def apply(self, options, filter_id=None):
"""apply new filter options
:param options: new option key-value pairs. For ordered option, use positional index (0
corresponds to the first option). Set value as None to drop the option.
Ordered options can only be dropped in contiguous fashion, including the
last ordered option.
:type options: dict
:param filter_id: new filter id, defaults to None
:type filter_id: str, optional
:return: new filter with modified options
:rtype: Filter
.. note::
To add new ordered options, int-keyed options item must be presented in
the increasing key order so the option can be expanded one at a time.
"""
try:
assert isinstance(self[-1], dict)
kwopts = dict(self[-1])
try:
opts = list(self[1:-1])
except:
opts = []
except:
kwopts = {}
try:
opts = list(self[1:])
except:
opts = []
nopts = len(opts)
delopts = set()
for k, v in options.items():
if type(k) == int:
if k < 0 or k > nopts:
raise Filter.Error(f"invalid positional index [{k}]")
if v is not None:
if k < nopts:
opts[k] = v
else:
opts = [*opts, v]
nopts += 1
elif k < 0 or k > nopts:
delopts.add(k)
else:
if v is None:
del kwopts[k]
else:
kwopts[k] = v
if len(delopts):
delopts = sorted(list(delopts))
o1 = delopts[0] - 1
on = delopts[-1]
if on != nopts or len(delopts) != on - o1:
raise Filter.Error(
f"cannot drop specified ordered options {delopts}. They must be contiguous and include the last ordered option."
)
opts = opts[:o1]
return Filter(self[0], *opts, filter_id=filter_id, **kwopts)
def _input_pad_is_available(self, index: tuple[int, int, int]) -> bool:
pad_pos = index[2]
return pad_pos >= 0 and pad_pos < self.get_num_inputs()
def _output_pad_is_available(self, index: tuple[int, int, int]) -> bool:
pad_pos = index[2]
return pad_pos >= 0 and pad_pos < self.get_num_outputs()
def _check_partial_pad_index(
self, index: tuple[int | None, int | None, int | None], is_input: bool
) -> bool:
"""True if defined values of the partial pad index are valid"""
if any(i is not None and i > 0 for i in index[:2]):
return False
pad = index[2]
if pad is None:
pad = 0 # use the smallest pad id
n = self.get_num_inputs() if is_input else self.get_num_outputs()
return pad >= 0 and pad < n
def _input_pad_is_chainable(self, index: tuple[int, int, int]) -> bool:
"""True if specified input pad is chainable"""
if any(i for i in index[:2]):
return False
return index[2] == self.get_num_inputs() - 1
def _output_pad_is_chainable(self, index: tuple[int, int, int]) -> bool:
"""True if specified output pad is chainable"""
if any(i for i in index[:2]):
return False
return index[2] == self.get_num_outputs() - 1