4
4
5
5
6
6
class Environment :
7
+ """
8
+ Base class for environments that are used to execute tasks.
9
+ Right now it is asssumed that the environment, including container images,
10
+ are available and are not removed at the end
11
+ TODO: add setup and teardown methods
12
+ """
13
+
7
14
def setup (self ):
8
15
pass
9
16
10
17
def execute (self , task ):
18
+ """
19
+ Execute the task in the environment.
20
+
21
+ Parameters
22
+ ----------
23
+ task : TaskBase
24
+ the task to execute
25
+
26
+ Returns
27
+ -------
28
+ output
29
+ Output of the task.
30
+ """
11
31
raise NotImplementedError
12
32
13
33
def teardown (self ):
14
34
pass
15
35
16
36
17
37
class Native (Environment ):
38
+ """
39
+ Native environment, i.e. the tasks are executed in the current python environment.
40
+ """
41
+
18
42
def execute (self , task ):
19
- # breakpoint()
20
- # args = task.render_arguments_in_root()
21
43
keys = ["return_code" , "stdout" , "stderr" ]
22
44
values = execute (task .command_args (), strip = task .strip )
23
45
output = dict (zip (keys , values ))
@@ -31,43 +53,65 @@ def execute(self, task):
31
53
return output
32
54
33
55
34
- class Docker (Environment ):
35
- def __init__ (self , image , tag = "latest" , output_cpath = "/output_pydra" , xargs = None ):
56
+ class Container (Environment ):
57
+ """
58
+ Base class for container environments used by Docker and Singularity.
59
+
60
+ Parameters
61
+ ----------
62
+ image : str
63
+ Name of the container image
64
+ tag : str
65
+ Tag of the container image
66
+ output_cpath : str
67
+ Path to the output directory in the container
68
+ xargs : Union[str, List[str]]
69
+ Extra arguments to be passed to the container
70
+ """
71
+
72
+ def __init__ (self , image , tag = "latest" , root = "/mnt/pydra" , xargs = None ):
36
73
self .image = image
37
74
self .tag = tag
75
+ if xargs is None :
76
+ xargs = []
77
+ elif isinstance (xargs , str ):
78
+ xargs = xargs .split ()
38
79
self .xargs = xargs
39
- self .output_cpath = output_cpath
80
+ self .root = root
40
81
41
82
@staticmethod
42
- def bind (loc , mode = "ro" , root = "/mnt/pydra" ): # TODO
43
- # XXX Failure mode: {loc} overwrites a critical directory in image
44
- # To fix, we'll need to update any args within loc to a new location
45
- # such as /mnt/pydra/loc
83
+ def bind (loc , mode = "ro" , root = "/mnt/pydra" ):
46
84
loc_abs = Path (loc ).absolute ()
47
- return f"{ loc_abs } :{ root } { loc_abs } :{ mode } " # TODO: moving entire path?
85
+ return f"{ loc_abs } :{ root } { loc_abs } :{ mode } "
86
+
48
87
49
- def execute (self , task , root = "/mnt/pydra" ):
50
- # XXX Need to mount all input locations
88
+ class Docker (Container ):
89
+ """Docker environment."""
90
+
91
+ def execute (self , task ):
51
92
docker_img = f"{ self .image } :{ self .tag } "
52
- # TODO ?
53
- # Skips over any inputs in task.cache_dir
54
- # Needs to include `out_file`s when not relative to working dir
55
- # Possibly a `TargetFile` type to distinguish between `File` and `str`?
56
- mounts = task .get_bindings (root = root )
93
+ # mounting all input locations
94
+ mounts = task .get_bindings (root = self .root )
57
95
58
96
# todo adding xargsy etc
59
- docker_args = ["docker" , "run" , "-v" , self .bind (task .cache_dir , "rw" )]
97
+ docker_args = [
98
+ "docker" ,
99
+ "run" ,
100
+ "-v" ,
101
+ self .bind (task .cache_dir , "rw" , self .root ),
102
+ ]
103
+ docker_args .extend (self .xargs )
60
104
docker_args .extend (
61
105
" " .join (
62
106
[f"-v { key } :{ val [0 ]} :{ val [1 ]} " for (key , val ) in mounts .items ()]
63
107
).split ()
64
108
)
65
- docker_args .extend (["-w" , f"{ root } { task .output_dir } " ])
109
+ docker_args .extend (["-w" , f"{ self . root } { task .output_dir } " ])
66
110
keys = ["return_code" , "stdout" , "stderr" ]
67
111
# print("\n Docker args", docker_args)
68
112
69
113
values = execute (
70
- docker_args + [docker_img ] + task .command_args (root = "/mnt/pydra" ),
114
+ docker_args + [docker_img ] + task .command_args (root = self . root ),
71
115
strip = task .strip ,
72
116
)
73
117
output = dict (zip (keys , values ))
@@ -76,39 +120,35 @@ def execute(self, task, root="/mnt/pydra"):
76
120
raise RuntimeError (output ["stderr" ])
77
121
else :
78
122
raise RuntimeError (output ["stdout" ])
79
- # Any outputs that have been created with a re-rooted path need
80
- # to be de-rooted
81
- # task.finalize_outputs("/mnt/pydra") TODO: probably don't need it
82
123
return output
83
124
84
125
85
- class Singularity (Docker ):
86
- def execute (self , task , root = "/mnt/pydra" ):
87
- # XXX Need to mount all input locations
126
+ class Singularity (Container ):
127
+ """Singularity environment."""
128
+
129
+ def execute (self , task ):
88
130
singularity_img = f"{ self .image } :{ self .tag } "
89
- # TODO ?
90
- # Skips over any inputs in task.cache_dir
91
- # Needs to include `out_file`s when not relative to working dir
92
- # Possibly a `TargetFile` type to distinguish between `File` and `str`?
93
- mounts = task .get_bindings (root = root )
131
+ # mounting all input locations
132
+ mounts = task .get_bindings (root = self .root )
94
133
95
134
# todo adding xargsy etc
96
135
singularity_args = [
97
136
"singularity" ,
98
137
"exec" ,
99
138
"-B" ,
100
- self .bind (task .cache_dir , "rw" ),
139
+ self .bind (task .cache_dir , "rw" , self . root ),
101
140
]
141
+ singularity_args .extend (self .xargs )
102
142
singularity_args .extend (
103
143
" " .join (
104
144
[f"-B { key } :{ val [0 ]} :{ val [1 ]} " for (key , val ) in mounts .items ()]
105
145
).split ()
106
146
)
107
- singularity_args .extend (["--pwd" , f"{ root } { task .output_dir } " ])
147
+ singularity_args .extend (["--pwd" , f"{ self . root } { task .output_dir } " ])
108
148
keys = ["return_code" , "stdout" , "stderr" ]
109
149
110
150
values = execute (
111
- singularity_args + [singularity_img ] + task .command_args (root = "/mnt/pydra" ),
151
+ singularity_args + [singularity_img ] + task .command_args (root = self . root ),
112
152
strip = task .strip ,
113
153
)
114
154
output = dict (zip (keys , values ))
@@ -117,7 +157,4 @@ def execute(self, task, root="/mnt/pydra"):
117
157
raise RuntimeError (output ["stderr" ])
118
158
else :
119
159
raise RuntimeError (output ["stdout" ])
120
- # Any outputs that have been created with a re-rooted path need
121
- # to be de-rooted
122
- # task.finalize_outputs("/mnt/pydra") TODO: probably don't need it
123
160
return output
0 commit comments