8000 Merge pull request #718 from djarecka/env · nipype/pydra@58038f5 · GitHub
[go: up one dir, main page]

Skip to content

Commit 58038f5

Browse files
authored
Merge pull request #718 from djarecka/env
[WIP] cleaning, fixing tests
2 parents 9cc0904 + 8d60dd1 commit 58038f5

File tree

5 files changed

+102
-214
lines changed

5 files changed

+102
-214
lines changed

pydra/engine/environments.py

Lines changed: 74 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -4,20 +4,42 @@
44

55

66
class Environment:
7+
"""
8+
Base class for environments that are used to execute tasks.
9+
Right now it is asssumed that the environment, including container images,
10+
are available and are not removed at the end
11+
TODO: add setup and teardown methods
12+
"""
13+
714
def setup(self):
815
pass
916

1017
def execute(self, task):
18+
"""
19+
Execute the task in the environment.
20+
21+
Parameters
22+
----------
23+
task : TaskBase
24+
the task to execute
25+
26+
Returns
27+
-------
28+
output
29+
Output of the task.
30+
"""
1131
raise NotImplementedError
1232

1333
def teardown(self):
1434
pass
1535

1636

1737
class Native(Environment):
38+
"""
39+
Native environment, i.e. the tasks are executed in the current python environment.
40+
"""
41+
1842
def execute(self, task):
19-
# breakpoint()
20-
# args = task.render_arguments_in_root()
2143
keys = ["return_code", "stdout", "stderr"]
2244
values = execute(task.command_args(), strip=task.strip)
2345
output = dict(zip(keys, values))
@@ -31,43 +53,65 @@ def execute(self, task):
3153
return output
3254

3355

34-
class Docker(Environment):
35-
def __init__(self, image, tag="latest", output_cpath="/output_pydra", xargs=None):
56+
class Container(Environment):
57+
"""
58+
Base class for container environments used by Docker and Singularity.
59+
60+
Parameters
61+
----------
62+
image : str
63+
Name of the container image
64+
tag : str
65+
Tag of the container image
66+
output_cpath : str
67+
Path to the output directory in the container
68+
xargs : Union[str, List[str]]
69+
Extra arguments to be passed to the container
70+
"""
71+
72+
def __init__(self, image, tag="latest", root="/mnt/pydra", xargs=None):
3673
self.image = image
3774
self.tag = tag
75+
if xargs is None:
76+
xargs = []
77+
elif isinstance(xargs, str):
78+
xargs = xargs.split()
3879
self.xargs = xargs
39-
self.output_cpath = output_cpath
80+
self.root = root
4081

4182
@staticmethod
42-
def bind(loc, mode="ro", root="/mnt/pydra"): # TODO
43-
# XXX Failure mode: {loc} overwrites a critical directory in image
44-
# To fix, we'll need to update any args within loc to a new location
45-
# such as /mnt/pydra/loc
83+
def bind(loc, mode="ro", root="/mnt/pydra"):
4684
loc_abs = Path(loc).absolute()
47-
return f"{loc_abs}:{root}{loc_abs}:{mode}" # TODO: moving entire path?
85+
return f"{loc_abs}:{root}{loc_abs}:{mode}"
86+
4887

49-
def execute(self, task, root="/mnt/pydra"):
50-
# XXX Need to mount all input locations
88+
class Docker(Container):
89+
"""Docker environment."""
90+
91+
def execute(self, task):
5192
docker_img = f"{self.image}:{self.tag}"
52-
# TODO ?
53-
# Skips over any inputs in task.cache_dir
54-
# Needs to include `out_file`s when not relative to working dir
55-
# Possibly a `TargetFile` type to distinguish between `File` and `str`?
56-
mounts = task.get_bindings(root=root)
93+
# mounting all input locations
94+
mounts = task.get_bindings(root=self.root)
5795

5896
# todo adding xargsy etc
59-
docker_args = ["docker", "run", "-v", self.bind(task.cache_dir, "rw")]
97+
docker_args = [
98+
"docker",
99+
"run",
100+
"-v",
101+
self.bind(task.cache_dir, "rw", self.root),
102+
]
103+
docker_args.extend(self.xargs)
60104
docker_args.extend(
61105
" ".join(
62106
[f"-v {key}:{val[0]}:{val[1]}" for (key, val) in mounts.items()]
63107
).split()
64108
)
65-
docker_args.extend(["-w", f"{root}{task.output_dir}"])
109+
docker_args.extend(["-w", f"{self.root}{task.output_dir}"])
66110
keys = ["return_code", "stdout", "stderr"]
67111
# print("\n Docker args", docker_args)
68112

69113
values = execute(
70-
docker_args + [docker_img] + task.command_args(root="/mnt/pydra"),
114+
docker_args + [docker_img] + task.command_args(root=self.root),
71115
strip=task.strip,
72116
)
73117
output = dict(zip(keys, values))
@@ -76,39 +120,35 @@ def execute(self, task, root="/mnt/pydra"):
76120
raise RuntimeError(output["stderr"])
77121
else:
78122
raise RuntimeError(output["stdout"])
79-
# Any outputs that have been created with a re-rooted path need
80-
# to be de-rooted
81-
# task.finalize_outputs("/mnt/pydra") TODO: probably don't need it
82123
return output
83124

84125

85-
class Singularity(Docker):
86-
def execute(self, task, root="/mnt/pydra"):
87-
# XXX Need to mount all input locations
126+
class Singularity(Container):
127+
"""Singularity environment."""
128+
129+
def execute(self, task):
88130
singularity_img = f"{self.image}:{self.tag}"
89-
# TODO ?
90-
# Skips over any inputs in task.cache_dir
91-
# Needs to include `out_file`s when not relative to working dir
92-
# Possibly a `TargetFile` type to distinguish between `File` and `str`?
93-
mounts = task.get_bindings(root=root)
131+
# mounting all input locations
132+
mounts = task.get_bindings(root=self.root)
94133

95134
# todo adding xargsy etc
96135
singularity_args = [
97136
"singularity",
98137
"exec",
99138
"-B",
100-
self.bind(task.cache_dir, "rw"),
139+
self.bind(task.cache_dir, "rw", self.root),
101140
]
141+
singularity_args.extend(self.xargs)
102142
singularity_args.extend(
103143
" ".join(
104144
[f"-B {key}:{val[0]}:{val[1]}" for (key, val) in mounts.items()]
105145
).split()
106146
)
107-
singularity_args.extend(["--pwd", f"{root}{task.output_dir}"])
147+
singularity_args.extend(["--pwd", f"{self.root}{task.output_dir}"])
108148
keys = ["return_code", "stdout", "stderr"]
109149

110150
values = execute(
111-
singularity_args + [singularity_img] + task.command_args(root="/mnt/pydra"),
151+
singularity_args + [singularity_img] + task.command_args(root=self.root),
112152
strip=task.strip,
113153
)
114154
output = dict(zip(keys, values))
@@ -117,7 +157,4 @@ def execute(self, task, root="/mnt/pydra"):
117157
raise RuntimeError(output["stderr"])
118158
else:
119159
raise RuntimeError(output["stdout"])
120-
# Any outputs that have been created with a re-rooted path need
121-
# to be de-rooted
122-
# task.finalize_outputs("/mnt/pydra") TODO: probably don't need it
123160
return output

pydra/engine/specs.py

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -676,23 +676,6 @@ def _check_requires(self, fld, inputs):
676676
return False
677677

678678

679-
@attr.s(auto_attribs=True, kw_only=True)
680-
class ContainerSpec(ShellSpec):
681-
"""Refine the generic command-line specification to container execution."""
682-
683-
image: ty.Union[File, str] = attr.ib(
684-
metadata={"help_string": "image", "mandatory": True}
685-
)
686-
"""The image to be containerized."""
687-
container: ty.Union[File, str, None] = attr.ib(
688-
metadata={"help_string": "container"}
689-
)
690-
"""The container."""
691-
container_xargs: ty.Optional[ty.List[str]] = attr.ib(
692-
default=None, metadata={"help_string": "todo"}
693-
)
694-
695-
696679
@attr.s
697680
class LazyInterface:
698681
_task: "core.TaskBase" = attr.ib()

0 commit comments

Comments
 (0)
0