8000 Initial commit, proof of concept script to compile CWL workflows to bash · common-workflow-lab/cwl2script@e5fedff · GitHub
[go: up one dir, main page]

Skip to content

Commit e5fedff

Browse files
author
Peter Amstutz
committed
Initial commit, proof of concept script to compile CWL workflows to bash
scripts.
0 parents  commit e5fedff

File tree

1 file changed

+200
-0
lines changed

1 file changed

+200
-0
lines changed

cwl2script.py

Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,200 @@
1+
import argparse
2+
import cwltool.main
3+
import sys
4+
import os
5+
import schema_salad
6+
import logging
7+
from cwltool.process import checkRequirements, shortname, adjustFiles
8+
import shellescape
9+
import re
10+
import copy
11+
import json
12+
13+
needs_shell_quoting = re.compile(r"""(^$|[\s|&;()<>\'"$@])""").search
14+
glob_metacharacters = re.compile(r"""[\[\]\*?]""").search
15+
16+
def maybe_quote(arg):
17+
return shellescape.quote(arg) if needs_shell_quoting(arg) else arg
18+
19+
def generateScriptForTool(tool, job, outdir):
20+
for j in tool.job(job, "", None, outdir=outdir):
21+
return ("""mkdir -p %s # output directory
22+
mkdir -p %s # temporary directory
23+
%s%s%s
24+
rm -r %s # clean up temporary directory
25+
""" % (maybe_quote(j.outdir), maybe_quote(j.tmpdir),
26+
" ".join([maybe_quote(arg) for arg in (j.command_line)]),
27+
' < %s' % maybe_quote(j.stdin) if j.stdin else '',
28+
' > %s' % maybe_quote(os.path.join(j.outdir, j.stdout)) if j.stdout else '',
29+
maybe_quote(j.tmpdir)),
30+
j.outdir, j.tmpdir)
31+
32+
33+
def generateScriptForWorkflow(cwlwf, cwljob, outdir):
34+
promises = {}
35+
jobs = {}
36+
script = ["#!/bin/sh",
37+
"",
38+
"# Workflow generated from %s using cwl2script" % (cwlwf.tool["id"]),
39+
""
40+
"set -x",
41+
""
42+
]
43+
44+
outdirs = []
45+
46+
for inp in cwlwf.tool["inputs"]:
47+
promises[inp["id"]] = (cwlwf, cwljob[shortname(inp["id"])])
48+
49+
alloutputs_fufilled = False
50+
while not alloutputs_fufilled:
51+
# Iteratively go over the workflow steps, adding jobs to the script as their
52+
# dependencies are fufilled by upstream workflow inputs or
53+
# step outputs. Loop exits when the workflow outputs
54+
# are satisfied.
55+
56+
alloutputs_fufilled = True
57+
58+
progress = False
59+
for step in cwlwf.steps:
60+
if step.tool["id"] not in jobs:
61+
stepinputs_fufilled = True
62+
for inp in step.tool["inputs"]:
63+
if "source" in inp and inp["source"] not in promises:
64+
stepinputs_fufilled = False
65+
if stepinputs_fufilled:
66+
jobobj = {}
67+
68+
# TODO: Handle multiple inbound links
69+
# TODO: Handle scatter/gather
70+
# (both are discussed in section 5.1.2 in CWL spec draft-2)
71+
72+
script.append("# Run step %s" % step.tool["id"])
73+
74+
for inp in step.tool["inputs"]:
75+
if "source" in inp:
76+
jobobj[shortname(inp["id"])] = promises[inp["source"]][1]
77+
script.append("# depends on step %s" % promises[inp["source"]][0].tool["id"])
78+
elif "default" in inp:
79+
d = copy.copy(inp["default"])
80+
jobobj[shortname(inp["id"])] = d
81+
82+
(wfjob, joboutdir, jobtmpdir) = generateScriptForTool(step.embedded_tool, jobobj, None)
83+
outdirs.append(joboutdir)
84+
85+
jobs[step.tool["id"]] = True
86+
87+
script.append(wfjob)
88+
89+
for out in step.tool["outputs"]:
90+
for toolout in step.embedded_tool.tool["outputs"]:
91+
if shortname(toolout["id"]) == shortname(out["id"]):
92+
if toolout["type"] != "File":
93+
raise Exception("Only supports file outputs")
94+
if glob_metacharacters(toolout["outputBinding"]["glob"]):
95+
raise Exception("Only support glob with concrete filename.")
96+
promises[out["id"]] = (step, {"class":"File", "path": os.path.join(joboutdir, toolout["outputBinding"]["glob"])})
97+
progress = True
98+
99+
for out in cwlwf.tool["outputs"]:
100+
if "source" in out:
101+
if out["source"] not in promises:
102+
alloutputs_fufilled = False
103+
104+
if not alloutputs_fufilled and not progress:
105+
raise Exception("Not making progress")
106+
107+
outobj = {}
108+
script.append("# Move output files to the current directory")
109+
110+
for out in cwlwf.tool["outputs"]:
111+
f = promises[out["source"]][1]
112+
script.append("mv %s ." % (maybe_quote(f["path"])))
113+
f["path"] = os.path.basename(f["path"])
114+
115+
if f.get("secondaryFiles"):
116+
script.append("mv %s ." % (' '.join([maybe_quote(sf["path"]) for sf in f["secondaryFiles"]])))
117+
for sf in f["secondaryFiles"]:
118+
sf["path"] = os.path.basename(sf["path"])
119+
120+
outobj[shortname(out["id"])] = f
121+
122+
script.append("")
123+
script.append("# Clean up staging output directories")
124+
script.append("rm -r %s" % (' '.join([maybe_quote(od) for od in outdirs])))
125+
script.append("")
126+
127+
script.append("# Generate final output object")
128+
script.append("echo '%s'" % json.dumps(outobj, indent=4))
129+
130+
return "\n".join(script)
131+
132+
133+
134+
supportedProcessRequirements = ["SchemaDefRequirement"]
135+
136+
def main(args=None):
137+
parser = argparse.ArgumentParser()
138+
parser.add_argument("cwltool", type=str)
139+
parser.add_argument("cwljob", type=str)
140+
141+
parser.add_argument("--conformance-test", action="store_true")
142+
parser.add_argument("--no-container", action="store_true")
143+
parser.add_argument("--basedir", type=str)
144+
parser.add_argument("--outdir", type=str, default=os.getcwd())
145+
146+
options = parser.parse_args(args)
147+
148+
uri = "file://" + os.path.abspath(options.cwljob)
149+
150+
if options.conformance_test:
151+
loader = schema_salad.ref_resolver.Loader({})
152+
else:
153+
loader = schema_salad.ref_resolver.Loader({
154+
"@base": uri,
155+
"path": {
156+
"@type": "@id"
157+
}
158+
})
159+
160+
job, _ = loader.resolve_ref(uri)
161+
162+
t = cwltool.main.load_tool(options.cwltool, False, False, cwltool.workflow.defaultMakeTool, True)
163+
164+
if type(t) == int:
165+
return t
166+
167+
try:
168+
checkRequirements(t.tool, supportedProcessRequirements)
169+
except Exception as e:
170+
logging.error(e)
171+
return 33
172+
173+
for inp in t.tool["inputs"]:
174+
if shortname(inp["id"]) in job:
175+
pass
176+
elif shortname(inp["id"]) not in job and "default" in inp:
177+
job[shortname(inp["id"])] = copy.copy(inp["default"])
178+
elif shortname(inp["id"]) not in job and inp["type"][0] == "null":
179+
pass
180+
else:
181+
raise Exception("Missing inputs `%s`" % shortname(inp["id"]))
182+
183+
if options.conformance_test:
184+
sys.stdout.write(json.dumps(cwltool.main.single_job_executor(t, job, options.basedir, options, conformance_test=True), indent=4))
185+
return 0
186+
187+
if not options.basedir:
188+
options.basedir = os.path.dirname(os.path.abspath(options.cwljob))
189+
190+
outdir = options.outdir
191+
192+
if t.tool["class"] == "Workflow":
193+
print generateScriptForWorkflow(t, job, outdir)
194+
elif t.tool["class"] == "CommandLineTool":
195+
print generateScriptForTool(t, job, outdir)
196+
197+
return 0
198+
199+
if __name__=="__main__":
200+
sys.exit(main(sys.argv[1:]))

0 commit comments

Comments
 (0)
0