1
1
from __future__ import unicode_literals
2
2
3
+ import warnings
4
+
3
5
from .dag import KwargReprNode
4
6
from ._utils import escape_chars , get_hash_int
5
7
from builtins import object
6
- import os
8
+ import os , sys
9
+ import inspect
7
10
8
11
9
12
def _is_of_types (obj , types ):
@@ -19,6 +22,13 @@ def _get_types_str(types):
19
22
return ', ' .join (['{}.{}' .format (x .__module__ , x .__name__ ) for x in types ])
20
23
21
24
25
+ def _get_arg_count (callable ):
26
+ if sys .version_info .major >= 3 :
27
+ return len (inspect .getfullargspec (callable ).args )
28
+ else :
29
+ return len (inspect .getargspec (callable ).args )
30
+
31
+
22
32
class Stream (object ):
23
33
"""Represents the outgoing edge of an upstream node; may be used to create more downstream nodes."""
24
34
@@ -30,6 +40,7 @@ def __init__(self, upstream_node, upstream_label, node_types, upstream_selector=
30
40
self .label = upstream_label
31
41
self .selector = upstream_selector
32
42
43
+
33
44
def __hash__ (self ):
34
45
return get_hash_int ([hash (self .node ), hash (self .label )])
35
46
@@ -85,6 +96,22 @@ def get_stream_spec_nodes(stream_spec):
85
96
class Node (KwargReprNode ):
86
97
"""Node base"""
87
98
99
+ @property
100
+ def min_inputs (self ):
101
+ return self .__min_inputs
102
+
103
+ @property
104
+ def max_inputs (self ):
105
+ return self .__max_inputs
106
+
107
+ @property
108
+ def incoming_stream_types (self ):
109
+ return self .__incoming_stream_types
110
+
111
+ @property
112
+ def outgoing_stream_type (self ):
113
+ return self .__outgoing_stream_type
114
+
88
115
@classmethod
89
116
def __check_input_len (cls , stream_map , min_inputs , max_inputs ):
90
117
if min_inputs is not None and len (stream_map ) < min_inputs :
@@ -106,19 +133,85 @@ def __get_incoming_edge_map(cls, stream_map):
106
133
incoming_edge_map [downstream_label ] = (upstream .node , upstream .label , upstream .selector )
107
134
return incoming_edge_map
108
135
109
- def __init__ (self , stream_spec , name , incoming_stream_types , outgoing_stream_type , min_inputs , max_inputs , args = [],
110
- kwargs = {}):
136
+ def __init_fromscratch__ (self , stream_spec , name , incoming_stream_types , outgoing_stream_type , min_inputs ,
137
+ max_inputs , args = [],
138
+ kwargs = {}):
111
139
stream_map = get_stream_map (stream_spec )
112
140
self .__check_input_len (stream_map , min_inputs , max_inputs )
113
141
self .__check_input_types (stream_map , incoming_stream_types )
114
142
incoming_edge_map = self .__get_incoming_edge_map (stream_map )
143
+
115
144
super (Node , self ).__init__ (incoming_edge_map , name , args , kwargs )
116
145
self .__outgoing_stream_type = outgoing_stream_type
146
+ self .__incoming_stream_types = incoming_stream_types
147
+ self .__min_inputs = min_inputs
148
+ self .__max_inputs = max_inputs
149
+
150
+ def __init_fromnode__ (self , old_node , stream_spec ):
151
+ # Make sure old node and new node are of the same type
152
+ if type (self ) != type (old_node ):
153
+ raise ValueError ("'old_node' should be of type {}" .format (self .__class__ .__name__ ))
154
+
155
+ # Copy needed data from old node
156
+ name = old_node .name
157
+ incoming_stream_types = old_node .incoming_stream_types
158
+ outgoing_stream_type = old_node .outgoing_stream_type
159
+ min_inputs = old_node .min_inputs
160
+ max_inputs = old_node .max_inputs
161
+ prev_edges = old_node .incoming_edge_map .values ()
162
+ args = old_node .args
163
+ kwargs = old_node .kwargs
164
+
165
+ # Check new stream spec - the old spec should have already been checked
166
+ new_stream_map = get_stream_map (stream_spec )
167
+ self .__check_input_types (new_stream_map , incoming_stream_types )
168
+
169
+ # Generate new edge map
170
+ new_inc_edge_map = self .__get_incoming_edge_map (new_stream_map )
171
+ new_edges = new_inc_edge_map .values ()
172
+
173
+ # Rename all edges
174
+ new_edge_map = dict (enumerate (list (prev_edges ) + list (new_edges )))
175
+
176
+ # Check new length
177
+ self .__check_input_len (new_edge_map , min_inputs , max_inputs )
117
178
179
+ super (Node , self ).__init__ (new_edge_map , name , args , kwargs )
180
+ self .__outgoing_stream_type = outgoing_stream_type
118
181
self .__incoming_stream_types = incoming_stream_types
119
182
self .__min_inputs = min_inputs
120
183
self .__max_inputs = max_inputs
121
184
185
+ # noinspection PyMissingConstructor
186
+ def __init__ (self , * args , ** kwargs ):
187
+ """
188
+ If called with the following arguments, the new Node is created from scratch:
189
+ - stream_spec, name, incoming_stream_types, outgoing_stream_type, min_inputs, max_inputs, args=[], kwargs={}
190
+
191
+ If called with the following arguments, the new node is a copy of `old_node` that includes the additional
192
+ `stream_spec`:
193
+ - old_node, stream_spec
194
+ """
195
+ # Python doesn't support constructor overloading. This hacky code detects how we want to construct the object
196
+ # based on the number of arguments and the type of the first argument, then calls the appropriate constructor
197
+ # helper method
198
+
199
+ # "1+" is for `self`
200
+ argc = 1 + len (args ) + len (kwargs )
201
+
202
+ first_arg = "old_node" in kwargs and kwargs ["old_node" ] or args [0 ]
203
+
204
+ if argc == _get_arg_count (self .__init_fromnode__ ) and type (first_arg ) == type (self ):
205
+ self .__init_fromnode__ (* args , ** kwargs )
206
+ else :
207
+ if isinstance (first_arg , Node ):
208
+ raise ValueError (
209
+ "{}.__init__() received an instance of {} as the first argument. If you want to create a "
210
+ "copy of an existing node, the types must match and you must provide an additional stream_spec."
211
+ .format (self .__class__ .__name__ , first_arg .__class__ .__name__ )
212
+ )
213
+ self .__init_fromscratch__ (* args , ** kwargs )
214
+
122
215
def stream (self , label = None , select = None ):
123
216
"""Create an outgoing stream originating from this node.
124
217
@@ -136,43 +229,19 @@ def __getitem__(self, item):
136
229
else :
137
230
return self .stream (label = item )
138
231
139
- def _add_streams (self , stream_spec ):
140
- """Attach additional streams after the Node is initialized.
141
- """
142
- # Back up previous edges
143
- prev_edges = self .incoming_edge_map .values ()
144
-
145
- # Check new edges
146
- new_stream_map = get_stream_map (stream_spec )
147
- self .__check_input_types (new_stream_map , self .__incoming_stream_types )
148
-
149
- # Generate new edge map
150
- new_inc_edge_map = self .__get_incoming_edge_map (new_stream_map )
151
- new_edges = new_inc_edge_map .values ()
152
-
153
- # Rename all edges
154
- new_edge_map = dict (enumerate (list (prev_edges ) + list (new_edges )))
155
-
156
- # Check new length
157
- self .__check_input_len (new_edge_map , self .__min_inputs , self .__max_inputs )
158
-
159
- # Overwrite old map (exploiting the fact that dict is mutable; incoming_edge_map is a read-only property)
160
- if None in self .incoming_edge_map :
161
- self .incoming_edge_map .pop (None )
162
- self .incoming_edge_map .update (new_edge_map )
163
-
164
232
165
233
class FilterableStream (Stream ):
166
234
def __init__ (self , upstream_node , upstream_label , upstream_selector = None ):
167
235
super (FilterableStream , self ).__init__ (upstream_node , upstream_label , {InputNode , FilterNode },
168
236
upstream_selector )
169
237
170
238
239
+ # noinspection PyMethodOverriding
171
240
class InputNode (Node ):
172
241
"""InputNode type"""
173
242
174
- def __init__ (self , name , args = [], kwargs = {}):
175
- super (InputNode , self ).__init__ (
243
+ def __init_fromscratch__ (self , name , args = [], kwargs = {}):
244
+ super (InputNode , self ).__init_fromscratch__ (
176
245
stream_spec = None ,
177
246
name = name ,
178
247
incoming_stream_types = {},
@@ -183,14 +252,18 @@ def __init__(self, name, args=[], kwargs={}):
183
252
kwargs = kwargs
184
253
)
185
254
255
+ def __init_fromnode__ (self , old_node , stream_spec ):
256
+ raise TypeError ("{} can't be constructed from an existing node" .format (self .__class__ .__name__ ))
257
+
186
258
@property
187
259
def short_repr (self ):
188
260
return os .path .basename (self .kwargs ['filename' ])
189
261
190
262
263
+ # noinspection PyMethodOverriding
191
264
class FilterNode (Node ):
192
- def __init__ (self , stream_spec , name , max_inputs = 1 , args = [], kwargs = {}):
193
- super (FilterNode , self ).__init__ (
265
+ def __init_fromscratch__ (self , stream_spec , name , max_inputs = 1 , args = [], kwargs = {}):
266
+ super (FilterNode , self ).__init_fromscratch__ (
194
267
stream_spec = stream_spec ,
195
268
name = name ,
196
269
incoming_stream_types = {FilterableStream },
@@ -227,9 +300,10 @@ def _get_filter(self, outgoing_edges):
227
300
return escape_chars (params_text , '\\ \' [],;' )
228
301
229
302
303
+ # noinspection PyMethodOverriding
230
304
class OutputNode (Node ):
231
- def __init__ (self , stream , name , args = [], kwargs = {}):
232
- super (OutputNode , self ).__init__ (
305
+ def __init_fromscratch__ (self , stream , name , args = [], kwargs = {}):
306
+ super (OutputNode , self ).__init_fromscratch__ (
233
307
stream_spec = stream ,
234
308
name = name ,
235
309
incoming_stream_types = {FilterableStream },
@@ -251,9 +325,10 @@ def __init__(self, upstream_node, upstream_label, upstream_selector=None):
251
325
upstream_selector = upstream_selector )
252
326
253
327
328
+ # noinspection PyMethodOverriding
254
329
class MergeOutputsNode (Node ):
255
- def __init__ (self , streams , name ):
256
- super (MergeOutputsNode , self ).__init__ (
330
+ def __init_fromscratch__ (self , streams , name ):
331
+ super (MergeOutputsNode , self ).__init_fromscratch__ (
257
332
stream_spec = streams ,
258
333
name = name ,
259
334
incoming_stream_types = {OutputStream },
@@ -263,9 +338,10 @@ def __init__(self, streams, name):
263
338
)
264
339
265
340
341
+ # noinspection PyMethodOverriding
266
342
class GlobalNode (Node ):
267
- def __init__ (self , stream , name , args = [], kwargs = {}):
268
- super (GlobalNode , self ).__init__ (
343
+ def __init_fromscratch__ (self , stream , name , args = [], kwargs = {}):
344
+ super (GlobalNode , self ).__init_fromscratch__ (
269
345
stream_spec = stream ,
270
346
name = name ,
271
347
incoming_stream_types = {OutputStream },
@@ -276,6 +352,9 @@ def __init__(self, stream, name, args=[], kwargs={}):
276
352
kwargs = kwargs
277
353
)
278
354
355
+ def __init_fromnode__ (self , old_node , stream_spec ):
356
+ raise TypeError ("{} can't be constructed from an existing node" .format (self .__class__ .__name__ ))
357
+
279
358
280
359
def stream_operator (stream_classes = {Stream }, name = None ):
281
360
def decorator (func ):
0 commit comments