10000 Merge pull request #593 from rcali21/PROV_io · nipype/pydra@a5dfe42 · GitHub
[go: up one dir, main page]

Skip to content

Commit a5dfe42

Browse files
authored
Merge pull request #593 from rcali21/PROV_io
Working on adding entity_message
2 parents 5bbb4a9 + 19beeba commit a5dfe42

File tree

2 files changed

+130
-31
lines changed

2 files changed

+130
-31
lines changed

pydra/engine/audit.py

Lines changed: 20 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@
44
import json
55
import attr
66
from ..utils.messenger import send_message, make_message, gen_uuid, now, AuditFlag
7-
from .helpers import ensure_list, gather_runtime_info
7+
from .helpers import ensure_list, gather_runtime_info, hash_file
8+
from .specs import attr_fields, File, Directory
89

910

1011
class Audit:
@@ -173,15 +174,24 @@ def audit_task(self, task):
173174
import subprocess as sp
174175

175176
label = task.name
176-
if hasattr(task.inputs, "executable"):
177-
command = task.cmdline
178-
# assume function task
179-
else:
180-
command = None
181-
# if hasattr(task.inputs, "in_file"):
182-
# input_file = task.inputs.in_file
183-
# else:
184-
# input_file = None
177+
178+
command = task.cmdline if hasattr(task.inputs, "executable") else None
179+
attr_list = attr_fields(task.inputs)
180+
for attrs in attr_list:
181+
if attrs.type in [File, Directory]:
182+
input_name = attrs.name
183+
input_path = os.path.abspath(getattr(task.inputs, input_name))
184+
file_hash = hash_file(input_path)
185+
entity_id = f"uid:{gen_uuid()}"
186+
entity_message = {
187+
"@id": entity_id,
188+
"Label": input_name,
189+
"AtLocation": input_path,
190+
"GeneratedBy": None,
191+
"@type": "input",
192+
"digest": file_hash,
193+
}
194+
self.audit_message(entity_message, AuditFlag.PROV)
185195

186196
if command is not None:
187197
cmd_name = command.split()[0]
@@ -210,9 +220,4 @@ def audit_task(self, task):
210220
"AssociatedWith": version_cmd,
211221
}
212222

213-
# new code to be added here for i/o tracking - WIP
214-
215223
self.audit_message(start_message, AuditFlag.PROV)
216-
217-
# add more fields according to BEP208 doc
218-
# with every field, check in tests

pydra/engine/tests/test_task.py

Lines changed: 110 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,17 @@
1111
from ..core import Workflow
1212
from ..task import AuditFlag, ShellCommandTask, DockerTask, SingularityTask
1313
from ...utils.messenger import FileMessenger, PrintMessenger, collect_messages
14-
from .utils import gen_basic_wf, use_validator
15-
from ..specs import MultiInputObj, MultiOutputObj, SpecInfo, FunctionSpec, BaseSpec
14+
from .utils import gen_basic_wf, use_validator, Submitter
15+
from ..specs import (
16+
MultiInputObj,
17+
MultiOutputObj,
18+
SpecInfo,
19+
FunctionSpec,
20+
BaseSpec,
21+
ShellSpec,
22+
File,
23+
)
24+
from ..helpers import hash_file
1625

1726
no_win = pytest.mark.skipif(
1827
sys.platform.startswith("win"),
@@ -998,18 +1007,21 @@ def testfunc(a: int, b: float = 0.1) -> ty.NamedTuple("Output", [("out", float)]
9981007
funky.cache_dir = tmpdir
9991008
funky()
10001009
message_path = tmpdir / funky.checksum / "messages"
1001-
# go through each jsonld file in message_path and check if the label field exists
1002-
json_content = []
1010+
10031011
for file in glob(str(message_path) + "/*.jsonld"):
10041012
with open(file, "r") as f:
10051013
data = json.load(f)
1006-
print(data)
1007-
if "Label" in data:
1008-
json_content.append(True)
1009-
assert "testfunc" == data["Label"]
1014+
if "@type" in data:
10101015
if "AssociatedWith" in data:
1011-
assert None == data["AssociatedWith"]
1012-
assert any(json_content)
1016+
assert "testfunc" in data["Label"]
1017+
1018+
if "@type" in data:
1019+
if data["@type"] == "input":
1020+
assert None == data["Label"]
1021+
if "AssociatedWith" in data:
1022+
assert None == data["AssociatedWith"]
1023+
1024+
# assert any(json_content)
10131025

10141026

10151027
def test_audit_shellcommandtask(tmpdir):
@@ -1028,21 +1040,103 @@ def test_audit_shellcommandtask(tmpdir):
10281040
shelly()
10291041
message_path = tmpdir / shelly.checksum / "messages"
10301042
# go through each jsonld file in message_path and check if the label field exists
1031-
label_content = []
1043+
10321044
command_content = []
10331045

10341046
for file in glob(str(message_path) + "/*.jsonld"):
10351047
with open(file, "r") as f:
10361048
data = json.load(f)
1037-
print(data)
1038-
if "Label" in data:
1039-
label_content.append(True)
1049+
1050+
if "@type" in data:
1051+
if "AssociatedWith" in data:
1052+
assert "shelly" in data["Label"]
1053+
1054+
if "@type" in data:
1055+
if data["@type"] == "input":
1056+
assert data["Label"] == None
1057+
10401058
if "Command" in data:
10411059
command_content.append(True)
10421060
assert "ls -l" == data["Command"]
10431061

1044-
print(command_content)
1045-
assert any(label_content)
1062+
assert any(command_content)
1063+
1064+
1065+
def test_audit_shellcommandtask_file(tmpdir):
1066+
# sourcery skip: use-fstring-for-concatenation
1067+
import glob
1068+
import shutil
1069+
1070+
# create test.txt file with "This is a test" in it in the tmpdir
1071+
# create txt file in cwd
1072+
with open("test.txt", "w") as f:
1073+
f.write("This is a test")
1074+
1075+
with open("test2.txt", "w") as f:
1076+
f.write("This is a test")
1077+
1078+
# copy the test.txt file to the tmpdir
1079+
shutil.copy("test.txt", tmpdir)
1080+
shutil.copy("test2.txt", tmpdir)
1081+
1082+
cmd = "cat"
1083+
file_in = tmpdir / "test.txt"
1084+
file_in_2 = tmpdir / "test2.txt"
1085+
test_file_hash = hash_file(file_in)
1086+
test_file_hash_2 = hash_file(file_in_2)
1087+
my_input_spec = SpecInfo(
1088+
name="Input",
1089+
fields=[
1090+
(
1091+
"in_file",
1092+
attr.ib(
1093+
type=File,
1094+
metadata={
1095+
"position": 1,
1096+
"argstr": "",
1097+
"help_string": "text",
1098+
"mandatory": True,
1099+
},
1100+
),
1101+
),
1102+
(
1103+
"in_file_2",
1104+
attr.ib(
1105+
type=File,
1106+
metadata={
1107+
"position": 2,
1108+
"argstr": "",
1109+
"help_string": "text",
1110+
"mandatory": True,
1111+
},
1112+
),
1113+
),
1114+
],
1115+
bases=(ShellSpec,),
1116+
)
1117+
shelly = ShellCommandTask(
1118+
name="shelly",
1119+
in_file=file_in,
1120+
in_file_2=file_in_2,
1121+
input_spec=my_input_spec,
1122+
executable=cmd,
1123+
audit_flags=AuditFlag.PROV,
1124+
messengers=FileMessenger(),
1125+
)
1126+
shelly.cache_dir = tmpdir
1127+
shelly()
1128+
message_path = tmpdir / shelly.checksum / "messages"
1129+
for file in glob.glob(str(message_path) + "/*.jsonld"):
1130+
with open(file, "r") as x:
1131+
data = json.load(x)
1132+
if "@type" in data:
1133+
if data["@type"] == "input":
1134+
if data["Label"] == "in_file":
1135+
assert data["AtLocation"] == str(file_in)
1136+
assert data["digest"] == test_file_hash
1137+
if data["Label"] == "in_file_2":
1138+
assert data["AtLocation"] == str(file_in_2)
1139+
assert data["digest"] == test_file_hash_2
10461140

10471141

10481142
def test_audit_shellcommandtask_version(tmpdir):

0 commit comments

Comments
 (0)
0