@@ -875,18 +875,16 @@ def run_el(self, runnable, rerun=False, **kwargs):
875
875
876
876
async def exec_dask (self , runnable , rerun = False ):
877
877
"""Run a task (coroutine wrapper)."""
878
- if self .client is None :
879
- from dask .distributed import Client
880
-
881
- self .client = await Client (** self .client_args , asynchronous = True )
882
-
883
- if isinstance (runnable , TaskBase ):
884
- future = self .client .submit (runnable ._run , rerun )
885
- result = await future
886
- else : # it could be tuple that includes pickle files with tasks and inputs
887
- ind , task_main_pkl , task_orig = runnable
888
- future = self .client .submit (load_and_run , task_main_pkl , ind , rerun )
889
- result = await future
878
+ from dask .distributed import Client
879
+
880
+ async with Client (** self .client_args , asynchronous = True ) as client :
881
+ if isinstance (runnable , TaskBase ):
882
+ future = client .submit (runnable ._run , rerun )
883
+ result = await future
884
+ else : # it could be tuple that includes pickle files with tasks and inputs
885
+ ind , task_main_pkl , task_orig = runnable
886
+ future = client .submit (load_and_run , task_main_pkl , ind , rerun )
887
+ result = await future
890
888
return result
891
889
892
890
def close (self ):
0 commit comments