|
22 | 22 | )
|
23 | 23 |
|
24 | 24 |
|
25 |
| -def _get_stream_name(name): |
26 |
| - return '{}'.format(name) |
27 |
| - |
28 |
| - |
29 | 25 | def _convert_kwargs_to_cmd_line_args(kwargs):
|
30 | 26 | args = []
|
31 | 27 | for k in sorted(kwargs.keys()):
|
@@ -85,7 +81,7 @@ def _allocate_filter_stream_names(filter_nodes, outgoing_edge_maps, stream_name_
|
85 | 81 | # TODO: automatically insert `splits` ahead of time via graph transformation.
|
86 | 82 | raise ValueError('Encountered {} with multiple outgoing edges with same upstream label {!r}; a '
|
87 | 83 | '`split` filter is probably required'.format(upstream_node, upstream_label))
|
88 |
| - stream_name_map[upstream_node, upstream_label] = _get_stream_name('s{}'.format(stream_count)) |
| 84 | + stream_name_map[upstream_node, upstream_label] = 's{}'.format(stream_count) |
89 | 85 | stream_count += 1
|
90 | 86 |
|
91 | 87 |
|
@@ -137,7 +133,7 @@ def get_args(stream_spec, overwrite_output=False):
|
137 | 133 | output_nodes = [node for node in sorted_nodes if isinstance(node, OutputNode)]
|
138 | 134 | global_nodes = [node for node in sorted_nodes if isinstance(node, GlobalNode)]
|
139 | 135 | filter_nodes = [node for node in sorted_nodes if isinstance(node, FilterNode)]
|
140 |
| - stream_name_map = {(node, None): _get_stream_name(i) for i, node in enumerate(input_nodes)} |
| 136 | + stream_name_map = {(node, None): str(i) for i, node in enumerate(input_nodes)} |
141 | 137 | filter_arg = _get_filter_arg(filter_nodes, outgoing_edge_maps, stream_name_map)
|
142 | 138 | args += reduce(operator.add, [_get_input_args(node) for node in input_nodes])
|
143 | 139 | if filter_arg:
|
|
0 commit comments