8000 #17: move graph stuff to dag.py; add edge labelling · Powercoder64/ffmpeg-python@6a9a12e · GitHub
[go: up one dir, main page]

Skip to content

Commit 6a9a12e

Browse files
committed
kkroening#17: move graph stuff to dag.py; add edge labelling
1 parent fc946be commit 6a9a12e

File tree

4 files changed

+206
-131
lines changed

4 files changed

+206
-131
lines changed

ffmpeg/_run.py

Lines changed: 8 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from __future__ import unicode_literals
22

3+
from .dag import topo_sort
34
from functools import reduce
45
from past.builtins import basestring
56
import copy
@@ -34,8 +35,8 @@ def _convert_kwargs_to_cmd_line_args(kwargs):
3435

3536

3637
def _get_input_args(input_node):
37-
if input_node._name == input.__name__:
38-
kwargs = copy.copy(input_node._kwargs)
38+
if input_node.name == input.__name__:
39+
kwargs = copy.copy(input_node.kwargs)
3940
filename = kwargs.pop('filename')
4041
fmt = kwargs.pop('format', None)
4142
video_size = kwargs.pop('video_size', None)
@@ -51,27 +52,6 @@ def _get_input_args(input_node):
5152
return args
5253

5354

54-
def _topo_sort(start_node):
55-
marked_nodes = []
56-
sorted_nodes = []
57-
child_map = {}
58-
def visit(node, child):
59-
assert node not in marked_nodes, 'Graph is not a DAG'
60-
if child is not None:
61-
if node not in child_map:
62-
child_map[node] = []
63-
child_map[node].append(child)
64-
if node not in sorted_nodes:
65-
marked_nodes.append(node)
66-
[visit(parent, node) for parent in node._parents]
67-
marked_nodes.remove(node)
68-
sorted_nodes.append(node)
69-
unmarked_nodes = [start_node]
70-
while unmarked_nodes:
71-
visit(unmarked_nodes.pop(), None)
72-
return sorted_nodes, child_map
73-
74-
7555
def _get_filter_spec(i, node, stream_name_map):
7656
stream_name = _get_stream_name('v{}'.format(i))
7757
stream_name_map[node] = stream_name
@@ -86,20 +66,20 @@ def _get_filter_arg(filter_nodes, stream_name_map):
8666

8767

8868
def _get_global_args(node):
89-
if node._name == overwrite_output.__name__:
69+
if node.name == overwrite_output.__name__:
9070
return ['-y']
9171
else:
9272
assert False, 'Unsupported global node: {}'.format(node)
9373

9474

9575
def _get_output_args(node, stream_name_map):
9676
args = []
97-
if node._name != merge_outputs.__name__:
77+
if node.name != merge_outputs.__name__:
9878
stream_name = stream_name_map[node._parents[0]]
9979
if stream_name != '[0]':
10080
args += ['-map', stream_name]
101-
if node._name == output.__name__:
102-
kwargs = copy.copy(node._kwargs)
81+
if node.name == output.__name__:
82+
kwargs = copy.copy(node.kwargs)
10383
filename = kwargs.pop('filename')
10484
fmt = kwargs.pop('format', None)
10585
if fmt:
@@ -116,7 +96,7 @@ def get_args(node):
11696
"""Get command-line arguments for ffmpeg."""
11797
args = []
11898
# TODO: group nodes together, e.g. `-i somefile -r somerate`.
119-
sorted_nodes, child_map = _topo_sort(node)
99+
sorted_nodes, child_map = topo_sort([node])
120100
del(node)
121101
input_nodes = [node for node in sorted_nodes if isinstance(node, InputNode)]
122102
output_nodes = [node for node in sorted_nodes if isinstance(node, OutputNode) and not

ffmpeg/dag.py

Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
from builtins import object
2+
from collections import namedtuple
3+
import copy
4+
import hashlib
5+
6+
7+
def _recursive_repr(item):
8+
"""Hack around python `repr` to deterministically represent dictionaries.
9+
10+
This is able to represent more things than json.dumps, since it does not require things to be JSON serializable
11+
(e.g. datetimes).
12+
"""
13+
if isinstance(item, basestring):
14+
result = str(item)
15+
elif isinstance(item, list):
16+
result = '[{}]'.format(', '.join([_recursive_repr(x) for x in item]))
17+
elif isinstance(item, dict):
18+
kv_pairs = ['{}: {}'.format(_recursive_repr(k), _recursive_repr(item[k])) for k in sorted(item)]
19+
result = '{' + ', '.join(kv_pairs) + '}'
20+
else:
21+
result = repr(item)
22+
return result
23+
24+
25+
def _get_hash(item):
26+
hasher = hashlib.sha224()
27+
repr_ = _recursive_repr(item)
28+
hasher.update(repr_.encode('utf-8'))
29+
return hasher.hexdigest()
30+
31+
32+
class DagNode(object):
33+
"""Node in a directed-acyclic graph (DAG).
34+
35+
Edges:
36+
DagNodes are connected by edges. An edge connects two nodes with a label for each side:
37+
- ``upstream_node``: upstream/parent node
38+
- ``upstream_label``: label on the outgoing side of the upstream node
39+
- ``downstream_node``: downstream/child node
40+
- ``downstream_label``: label on the incoming side of the downstream node
41+
42+
For example, DagNode A may be connected to DagNode B with an edge labelled "foo" on A's side, and "bar" on B's
43+
side:
44+
45+
_____ _____
46+
| | | |
47+
| A >[foo]---[bar]> B |
48+
|_____| |_____|
49+
50+
Edge labels may be integers or strings, and nodes cannot have more than one incoming edge with the same label.
51+
52+
DagNodes may have any number of incoming edges and any number of outgoing edges. DagNodes keep track only of
53+
their incoming edges, but the entire graph structure can be inferred by looking at the furthest downstream
54+
nodes and working backwards.
55+
56+
Hashing:
57+
DagNodes must be hashable, and two nodes are considered to be equivalent if they have the same hash value.
58+
59+
Nodes are immutable, and the hash should remain constant as a result. If a node with new contents is required,
60+
create a new node and throw the old one away.
61+
62+
String representation:
63+
In order for graph visualization tools to show useful information, nodes must be representable as strings. The
64+
``repr`` operator should provide a more or less "full" representation of the node, and the ``short_repr``
65+
property should be a shortened, concise representation.
66+
67+
Again, because nodes are immutable, the string representations should remain constant.
68+
"""
69+
def __hash__(self):
70+
"""Return an integer hash of the node."""
71+
raise NotImplementedError()
72+
73+
def __eq__(self, other):
74+
"""Compare two nodes; implementations should return True if (and only if) hashes match."""
75+
raise NotImplementedError()
76+
77+
def __repr__(self, other):
78+
"""Return a full string representation of the node."""
79+
raise NotImplementedError()
80+
81+
@property
82+
def short_repr(self):
83+
"""Return a partial/concise representation of the node."""
84+
raise NotImplementedError()
85+
86+
@property
87+
def incoming_edge_map(self):
88+
"""Provides information about all incoming edges that connect to this node.
89+
90+
The edge map is a dictionary that maps an ``incoming_label`` to ``(outgoing_node, outgoing_label)``. Note that
91+
implicity, ``incoming_node`` is ``self``. See "Edges" section above.
92+
"""
93+
raise NotImplementedError()
94+
95+
96+
DagEdge = namedtuple('DagEdge', ['downstream_node', 'downstream_label', 'upstream_node', 'upstream_label'])
97+
98+
99+
class KwargReprNode(DagNode):
100+
"""A DagNode that can be represented as a set of args+kwargs.
101+
"""
102+
def __get_hash(self):
103+
hashes = self.__upstream_hashes + [self.__inner_hash]
104+
hash_strs = [str(x) for x in hashes]
105+
hashes_str = ','.join(hash_strs).encode('utf-8')
106+
hash_str = hashlib.md5(hashes_str).hexdigest()
107+
return int(hash_str, base=16)
108+
109+
def __init__(self, incoming_edge_map, name, args, kwargs):
110+
self.__incoming_edge_map = incoming_edge_map
111+
self.name = name
112+
self.args = args
113+
self.kwargs = kwargs
114+
self.__hash = self.__get_hash()
115+
116+
@property
117+
def __upstream_hashes(self):
118+
hashes = []
119+
for downstream_label, (upstream_node, upstream_label) in self.incoming_edge_map.items():
120+
hashes += [hash(x) for x in [downstream_label, upstream_node, upstream_label]]
121+
return hashes
122+
123+
@property
124+
def __inner_hash(self):
125+
props = {'args': self.args, 'kwargs': self.kwargs}
126+
return _get_hash(props)
127+
128+
def __hash__(self):
129+
return self.__hash
130+
131+
def __eq__(self, other):
132+
return hash(self) == hash(other)
133+
134+
@property
135+
def short_hash(self):
136+
return '{:x}'.format(abs(hash(self)))[:12]
137+
138+
def __repr__(self):
139+
formatted_props = ['{!r}'.format(arg) for arg in self.args]
140+
formatted_props += ['{}={!r}'.format(key, self.kwargs[key]) for key in sorted(self.kwargs)]
141+
return '{}({}) <{}>'.format(self.name, ', '.join(formatted_props), self.short_hash)
142+
143+
@property
144+
def incoming_edges(self):
145+
edges = []
146+
for downstream_label, (upstream_node, upstream_label) in self.incoming_edge_map.items():
147+
downstream_node = self
148+
edges += [DagEdge(downstream_node, downstream_label, upstream_node, upstream_label)]
149+
return edges
150+
151+
@property
152+
def incoming_edge_map(self):
153+
return self.__incoming_edge_map
154+
155+
@property
156+
def short_repr(self):
157+
return self.name
158+
159+
160+
def topo_sort(start_nodes):
161+
marked_nodes = []
162+
sorted_nodes = []
163+
child_map = {}
164+
def visit(node, child):
165+
assert node not in marked_nodes, 'Graph is not a DAG'
166+
if child is not None:
167+
if node not in child_map:
168+
child_map[node] = []
169+
child_map[node].append(child)
170+
if node not in sorted_nodes:
171+
marked_nodes.append(node)
172+
[visit(parent, node) for parent in node._parents]
173+
marked_nodes.remove(node)
174+
sorted_nodes.append(node)
175+
unmarked_nodes = list(copy.copy(start_nodes))
176+
while unmarked_nodes:
177+
visit(unmarked_nodes.pop(), None)
178+
return sorted_nodes, child_map

ffmpeg/nodes.py

Lines changed: 14 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -1,105 +1,22 @@
11
from __future__ import unicode_literals
22

3-
from builtins import object
4-
import copy
5-
import hashlib
3+
from .dag import KwargReprNode
64

75

8-
def _recursive_repr(item):
9-
"""Hack around python `repr` to deterministically represent dictionaries.
10-
11-
This is able to represent more things than json.dumps, since it does not require things to be JSON serializable
12-
(e.g. datetimes).
13-
"""
14-
if isinstance(item, basestring):
15-
result = str(item)
16-
elif isinstance(item, list):
17-
result = '[{}]'.format(', '.join([_recursive_repr(x) for x in item]))
18-
elif isinstance(item, dict):
19-
kv_pairs = ['{}: {}'.format(_recursive_repr(k), _recursive_repr(item[k])) for k in sorted(item)]
20-
result = '{' + ', '.join(kv_pairs) + '}'
21-
else:
22-
result = repr(item)
23-
return result
24-
25-
26-
def _create_hash(item):
27-
hasher = hashlib.sha224()
28-
repr_ = _recursive_repr(item)
29-
hasher.update(repr_.encode('utf-8'))
30-
return hasher.hexdigest()
31-
32-
33-
class _NodeBase(object):
34-
@property
35-
def hash(self):
36-
if se 10000 lf._hash is None:
37-
self._update_hash()
38-
return self._hash
39-
40-
def __init__(self, parents, name):
41-
parent_hashes = [hash(parent) for parent in parents]
42-
assert len(parent_hashes) == len(set(parent_hashes)), 'Same node cannot be included as parent multiple times'
43-
self._parents = parents
44-
self._hash = None
45-
self._name = name
46-
47-
def _transplant(self, new_parents):
48-
other = copy.copy(self)
49-
other._parents = copy.copy(new_parents)
50-
return other
51-
52-
@property
53-
def _repr_args(self):
54-
raise NotImplementedError()
55-
56-
@property
57-
def _repr_kwargs(self):
58-
raise NotImplementedError()
59-
60-
@property
61-
def _short_hash(self):
62-
return '{:x}'.format(abs(hash(self)))[:12]
63-
64-
def __repr__(self):
65-
args = self._repr_args
66-
kwargs = self._repr_kwargs
67-
formatted_props = ['{!r}'.format(arg) for arg in args]
68-
formatted_props += ['{}={!r}'.format(key, kwargs[key]) for key in sorted(kwargs)]
69-
return '{}({}) <{}>'.format(self._name, ', '.join(formatted_props), self._short_hash)
70-
71-
def __hash__(self):
72-
if self._hash is None:
73-
self._update_hash()
74-
return self._hash
75-
76-
def __eq__(self, other):
77-
return hash(self) == hash(other)
78-
79-
def _update_hash(self):
80-
props = {'args': self._repr_args, 'kwargs': self._repr_kwargs}
81-
my_hash = _create_hash(props)
82-
parent_hashes = [str(hash(parent)) for parent in self._parents]
83-
hashes = parent_hashes + [my_hash]
84-
hashes_str = ','.join(hashes).encode('utf-8')
85-
hash_str = hashlib.md5(hashes_str).hexdigest()
86-
self._hash = int(hash_str, base=16)
87-
88-
89-
class Node(_NodeBase):
6+
class Node(KwargReprNode):
907
"""Node base"""
918
def __init__(self, parents, name, *args, **kwargs):
92-
super(Node, self).__init__(parents, name)
93-
self._args = args
94-
self._kwargs = kwargs
95-
96-
@property
97-
def _repr_args(self):
98-
return self._args
9+
incoming_edge_map = {}
10+
for downstream_label, parent in enumerate(parents):
11+
upstream_label = 0 # assume nodes have a single output (FIXME)
12+
upstream_node = parent
13+
incoming_edge_map[downstream_label] = (upstream_node, upstream_label)
14+
super(Node, self).__init__(incoming_edge_map, name, args, kwargs)
9915

10016
@property
101-
def _repr_kwargs(self):
102-
return self._kwargs
17+
def _parents(self):
18+
# TODO: change graph compilation to use `self.incoming_edges` instead.
19+
return [edge.upstream_node for edge in self.incoming_edges]
10320

10421

10522
class InputNode(Node):
@@ -111,9 +28,9 @@ def __init__(self, name, *args, **kwargs):
11128
class FilterNode(Node):
11229
"""FilterNode"""
11330
def _get_filter(self):
114-
params_text = self._name
115-
arg_params = ['{}'.format(arg) for arg in self._args]
116-
kwarg_params = ['{}={}'.format(k, self._kwargs[k]) for k in sorted(self._kwargs)]
31+
params_text = self.name
32+
arg_params = ['{}'.format(arg) for arg in self.args]
33+
kwarg_params = ['{}={}'.format(k, self.kwargs[k]) for k in sorted(self.kwargs)]
11734
params = arg_params + kwarg_params
11835
if params:
11936
params_text += '={}'.format(':'.join(params))

0 commit comments

Comments
 (0)
0