|
22 | 22 | ) |
23 | 23 |
|
24 | 24 |
|
25 | | -def _get_stream_name(name): |
26 | | - return '{}'.format(name) |
27 | | - |
28 | | - |
29 | 25 | def _convert_kwargs_to_cmd_line_args(kwargs): |
30 | 26 | args = [] |
31 | 27 | for k in sorted(kwargs.keys()): |
@@ -85,7 +81,7 @@ def _allocate_filter_stream_names(filter_nodes, outgoing_edge_maps, stream_name_ |
85 | 81 | # TODO: automatically insert `splits` ahead of time via graph transformation. |
86 | 82 | raise ValueError('Encountered {} with multiple outgoing edges with same upstream label {!r}; a ' |
87 | 83 | '`split` filter is probably required'.format(upstream_node, upstream_label)) |
88 | | - stream_name_map[upstream_node, upstream_label] = _get_stream_name('s{}'.format(stream_count)) |
| 84 | + stream_name_map[upstream_node, upstream_label] = 's{}'.format(stream_count) |
89 | 85 | stream_count += 1 |
90 | 86 |
|
91 | 87 |
|
@@ -137,7 +133,7 @@ def get_args(stream_spec, overwrite_output=False): |
137 | 133 | output_nodes = [node for node in sorted_nodes if isinstance(node, OutputNode)] |
138 | 134 | global_nodes = [node for node in sorted_nodes if isinstance(node, GlobalNode)] |
139 | 135 | filter_nodes = [node for node in sorted_nodes if isinstance(node, FilterNode)] |
140 | | - stream_name_map = {(node, None): _get_stream_name(i) for i, node in enumerate(input_nodes)} |
| 136 | + stream_name_map = {(node, None): str(i) for i, node in enumerate(input_nodes)} |
141 | 137 | filter_arg = _get_filter_arg(filter_nodes, outgoing_edge_maps, stream_name_map) |
142 | 138 | args += reduce(operator.add, [_get_input_args(node) for node in input_nodes]) |
143 | 139 | if filter_arg: |
|
0 commit comments