8000 Merge 3d687b06db10f4897beddd9548622960bdaa62bc into 29d3d1fbafb6dc29b… · nipype/pydra@0f646d2 · GitHub
[go: up one dir, main page]

Skip to content

Commit 0f646d2

Browse files
authored
Merge 3d687b0 into 29d3d1f
2 parents 29d3d1f + 3d687b0 commit 0f646d2

File tree

2 files changed

+55
-6
lines changed

2 files changed

+55
-6
lines changed

.github/workflows/testdask.yml

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
name: Dask
2+
3+
on:
4+
push:
5+
branches:
6+
- master
7+
pull_request:
8+
9+
concurrency:
10+
group: ${{ github.workflow }}-${{ github.ref }}
11+
cancel-in-progress: true
12+
13+
permissions:
14+
contents: read
15+
16+
jobs:
17+
test:
18+
strategy:
19+
matrix:
20+
os: [ubuntu-latest, macos-latest]
21+
python-version: ['3.9', '3.10', '3.11']
22+
fail-fast: false
23+
runs-on: ${{ matrix.os }}
24+
25+
steps:
26+
- name: Checkout repository
27+
uses: actions/checkout@v3
28+
with:
29+
repository: ${{ github.repository }}
30+
31+
- name: Setup Python version ${{ matrix.python-version }}
32+
uses: actions/setup-python@v4
33+
with:
34+
python-version: ${{ matrix.python-version }}
35+
36+
- name: Install dependencies for Dask
37+
run: |
38+
pip install -e ".[test,dask]"
39+
40+
- name: Run tests for Dask
41+
run: |
42+
pytest -v --dask pydra/engine --cov pydra --cov-config .coveragerc --cov-report xml:cov.xml
43+
44+
- name: Upload to codecov
45+
run: codecov -f cov.xml -F unittests -e GITHUB_WORKFLOW

pydra/engine/workers.py

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -875,12 +875,16 @@ def run_el(self, runnable, rerun=False, **kwargs):
875875

876876
async def exec_dask(self, runnable, rerun=False):
877877
"""Run a task (coroutine wrapper)."""
878-
if self.client is None:
879-
from dask.distributed import Client
880-
881-
self.client = await Client(**self.client_args, asynchronous=True)
882-
future = self.client.submit(runnable._run, rerun)
883-
result = await future
878+
from dask.distributed import Client
879+
880+
async with Client(**self.client_args, asynchronous=True) as client:
881+
if isinstance(runnable, TaskBase):
882+
future = client.submit(runnable._run, rerun)
883+
result = await future
884+
else: # it could be tuple that includes pickle files with tasks and inputs
885+
ind, task_main_pkl, task_orig = runnable
886+
future = client.submit(load_and_run, task_main_pkl, ind, rerun)
887+
result = await future
884888
return result
885889

886890
def close(self):

0 commit comments

Comments
 (0)
0