8000 Merge pull request #25 from djarecka/adi611-adi611-patch-updatedask-1 · nipype/pydra@3d687b0 · GitHub
[go: up one dir, main page]

Skip to content

Commit 3d687b0

Browse files
authored
Merge pull request #25 from djarecka/adi611-adi611-patch-updatedask-1
properly closing the dask Client
2 parents a0778c8 + c7384b9 commit 3d687b0

File tree

2 files changed

+11
-13
lines changed

2 files changed

+11
-13
lines changed

.github/workflows/testdask.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ jobs:
2727
uses: actions/checkout@v3
2828
with:
2929
repository: ${{ github.repository }}
30-
30+
3131
- name: Setup Python version ${{ matrix.python-version }}
3232
uses: actions/setup-python@v4
3333
with:

pydra/engine/workers.py

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -875,18 +875,16 @@ def run_el(self, runnable, rerun=False, **kwargs):
875875

876876
async def exec_dask(self, runnable, rerun=False):
877877
"""Run a task (coroutine wrapper)."""
878-
if self.client is None:
879-
from dask.distributed import Client
880-
881-
self.client = await Client(**self.client_args, asynchronous=True)
882-
883-
if isinstance(runnable, TaskBase):
884-
future = self.client.submit(runnable._run, rerun)
885-
result = await future
886-
else: # it could be tuple that includes pickle files with tasks and inputs
887-
ind, task_main_pkl, task_orig = runnable
888-
future = self.client.submit(load_and_run, task_main_pkl, ind, rerun)
889-
result = await future
878+
from dask.distributed import Client
879+
880+
async with Client(**self.client_args, asynchronous=True) as client:
881+
if isinstance(runnable, TaskBase):
882+
future = client.submit(runnable._run, rerun)
883+
result = await future
884+
else: # it could be tuple that includes pickle files with tasks and inputs
885+
ind, task_main_pkl, task_orig = runnable
886+
future = client.submit(load_and_run, task_main_pkl, ind, rerun)
887+
result = await future
890888
return result
891889

892890
def close(self):

0 commit comments

Comments
 (0)
0