@@ -1764,11 +1764,8 @@ def __init__(
1764
1764
self .clients ["fire-and-forget" ] = ClientState ("fire-and-forget" )
1765
1765
self .extensions = {}
1766
1766
self .host_info = host_info
1767
- self .idle = SortedDict ()
1768
- self .idle_task_count = set ()
1769
1767
self .n_tasks = 0
1770
1768
self .resources = resources
1771
- self .saturated = set ()
1772
1769
self .tasks = tasks
1773
1770
self .replicated_tasks = {
1774
1771
ts for ts in self .tasks .values () if len (ts .who_has or ()) > 1
@@ -1852,7 +1849,6 @@ def __pdict__(self) -> dict[str, Any]:
1852
1849
return {
1853
1850
"bandwidth" : self .bandwidth ,
1854
1851
"resources" : self .resources ,
1855
- "saturated" : self .saturated ,
1856
1852
"unrunnable" : self .unrunnable ,
1857
1853
"queued" : self .queued ,
1858
1854
"n_tasks" : self .n_tasks ,
@@ -1867,7 +1863,6 @@ def __pdict__(self) -> dict[str, Any]:
1867
1863
"extensions" : self .extensions ,
1868
1864
"clients" : self .clients ,
1869
1865
"workers" : self .workers ,
1870
- "idle" : self .idle ,
1871
1866
"host_info" : self .host_info ,
1872
1867
}
1873
1868
@@ -2282,7 +2277,7 @@ def decide_worker_rootish_queuing_disabled(
2282
2277
# See root-ish-ness note below in `decide_worker_rootish_queuing_enabled`
2283
2278
assert math .isinf (self .WORKER_SATURATION ) or not ts ._queueable
2284
2279
2285
- pool = self .idle . values () if self . idle else self . running
2280
+ pool = self .running
2286
2281
if not pool :
2287
2282
return None
2288
2283
@@ -2347,22 +2342,16 @@ def decide_worker_rootish_queuing_enabled(self) -> WorkerState | None:
2347
2342
# then add that assertion here (and actually pass in the task).
2348
2343
assert not math .isinf (self .WORKER_SATURATION )
2349
2344
2350
- if not self .idle_task_count :
2351
- # All workers busy? Task gets/stays queued.
2345
+ if not self .running :
2352
2346
return None
2353
2347
2354
2348
# Just pick the least busy worker.
2355
2349
# NOTE: this will lead to worst-case scheduling with regards to co-assignment.
2356
- ws = min (
2357
- self .idle_task_count ,
2358
- key = lambda ws : len (ws .processing ) / ws .nthreads ,
2359
- )
2350
+ ws = min (self .running , key = lambda ws : len (ws .processing ) / ws .nthreads )
2351
+ if _worker_full (ws , self .WORKER_SATURATION ):
2352
+ return None
2360
2353
if self .validate :
2361
2354
assert self .workers .get (ws .address ) is ws
2362
- assert not _worker_full (ws , self .WORKER_SATURATION ), (
2363
- ws ,
2364
- _task_slots_available (ws , self .WORKER_SATURATION ),
2365
- )
2366
2355
assert ws in self .running , (ws , self .running )
2367
2356
2368
2357
return ws
@@ -2406,7 +2395,7 @@ def decide_worker_non_rootish(self, ts: TaskState) -> WorkerState | None:
2406
2395
# dependencies, but its group is also smaller than the cluster.
2407
2396
2408
2397
# Fastpath when there are no related tasks or restrictions
2409
- worker_pool = self .idle or self . workers
2398
+ worker_pool = self .workers
2410
2399
# FIXME idle and workers are SortedDict's declared as dicts
2411
2400
# because sortedcontainers is not annotated
2412
2401
wp_vals = cast ("Sequence[WorkerState]" , worker_pool .values ())
@@ -2906,7 +2895,6 @@ def _transition_waiting_queued(self, key: Key, stimulus_id: str) -> RecsMsgs:
2906
2895
ts = self .tasks [key ]
2907
2896
2908
2897
if self .validate :
2909
- assert not self .idle_task_count , (ts , self .idle_task_count )
2910
2898
self ._validate_ready (ts )
2911
2899
2912
2900
ts .state = "queued"
@@ -3094,63 +3082,6 @@ def is_rootish(self, ts: TaskState) -> bool:
3094
3082
and sum (map (len , tg .dependencies )) < 5
3095
3083
)
3096
3084
3097
- def check_idle_saturated (self , ws : WorkerState , occ : float = - 1.0 ) -> None :
3098
- """Update the status of the idle and saturated state
3099
-
3100
- The scheduler keeps track of workers that are ..
3101
-
3102
- - Saturated: have enough work to stay busy
3103
- - Idle: do not have enough work to stay busy
3104
-
3105
- They are considered saturated if they both have enough tasks to occupy
3106
- all of their threads, and if the expected runtime of those tasks is
3107
- large enough.
3108
-
3109
- If ``distributed.scheduler.worker-saturation`` is not ``inf``
3110
- (scheduler-side queuing is enabled), they are considered idle
3111
- if they have fewer tasks processing than the ``worker-saturation``
3112
- threshold dictates.
3113
-
3114
- Otherwise, they are considered idle if they have fewer tasks processing
3115
- than threads, or if their tasks' total expected runtime is less than half
3116
- the expected runtime of the same number of average tasks.
3117
-
3118
- This is useful for load balancing and adaptivity.
3119
- """
3120
- if self .total_nthreads == 0 or ws .status == Status .closed :
3121
- return
3122
- if occ < 0 :
3123
- occ = ws .occupancy
3124
-
3125
- p = len (ws .processing )
3126
-
3127
- self .saturated .discard (ws )
3128
- if ws .status != Status .running :
3129
- self .idle .pop (ws .address , None )
3130
- elif self .is_unoccupied (ws , occ , p ):
3131
- self .idle [ws .address ] = ws
3132
- else :
3133
- self .idle .pop (ws .address , None )
3134
- nc = ws .nthreads
3135
- if p > nc :
3136
- pending = occ * (p - nc ) / (p * nc )
3137
- if 0.4 < pending > 1.9 * (self .total_occupancy / self .total_nthreads ):
3138
- self .saturated .add (ws )
3139
-
3140
- if not _worker_full (ws , self .WORKER_SATURATION ) and ws .status == Status .running :
3141
- self .idle_task_count .add (ws )
3142
- else :
3143
- self .idle_task_count .discard (ws )
3144
-
3145
- def is_unoccupied (
3146
- self , ws : WorkerState , occupancy : float , nprocessing : int
3147
- ) -> bool :
3148
- nthreads = ws .nthreads
3149
- return (
3150
- nprocessing < nthreads
3151
- or occupancy < nthreads * (self .total_occupancy / self .total_nthreads ) / 2
3152
- )
3153
-
3154
3085
def get_comm_cost (self , ts : TaskState , ws : WorkerState ) -> float :
3155
3086
"""
3156
3087
Get the estimated communication cost (in s.) to compute the task
@@ -3357,7 +3288,6 @@ def _add_to_processing(
3357
3288
ts .processing_on = ws
3358
3289
ts .state = "processing"
3359
3290
self .acquire_resources (ts , ws )
3360
- self .check_idle_saturated (ws )
3361
3291
self .n_tasks += 1
3362
3292
3363
3293
if ts .actor :
@@ -3423,7 +3353,6 @@ def _exit_processing_common(self, ts: TaskState) -> WorkerState | None:
3423
3353
if self .workers .get (ws .address ) is not ws : # may have been removed
3424
3354
return None
3425
3355
3426
- self .check_idle_saturated (ws )
3427
3356
self .release_resources (ts , ws )
3428
3357
3429
3358
return ws
@@ -4547,10 +4476,6 @@ async def add_worker(
4547
4476
metrics = metrics ,
4548
4477
)
4549
4478
4550
- # Do not need to adjust self.total_occupancy as self.occupancy[ws] cannot
4551
- # exist before this.
4552
- self .check_idle_saturated (ws )
4553
-
4554
4479
self .stream_comms [address ] = BatchedSend (interval = "5ms" , loop = self .loop )
4555
4480
4556
4481
awaitables = []
@@ -5167,13 +5092,11 @@ def stimulus_queue_slots_maybe_opened(self, *, stimulus_id: str) -> None:
5167
5092
so any tasks that became runnable are already in ``processing``. Otherwise,
5168
5093
overproduction can occur if queued tasks get scheduled before downstream tasks.
5169
5094
5170
- Must be called after `check_idle_saturated`; i.e. `idle_task_count` must be up to date.
5171
5095
"""
5172
5096
if not self .queued :
5173
5097
return
5174
5098
slots_available = sum (
5175
- _task_slots_available (ws , self .WORKER_SATURATION )
5176
- for ws in self .idle_task_count
5099
+ _task_slots_available (ws , self .WORKER_SATURATION ) for ws in self .running
5177
5100
)
5178
5101
if slots_available == 0 :
5179
5102
return
@@ -5403,9 +5326,6 @@ async def remove_worker(
5403
5326
self .rpc .remove (address )
5404
5327
del self .stream_comms [address ]
5405
5328
del self .aliases [ws .name ]
5406
- self .idle .pop (ws .address , None )
5407
- self .idle_task_count .discard (ws )
5408
- self .saturated .discard (ws )
5409
5329
del self .workers [address ]
5410
5330
self ._workers_removed_total += 1
5411
5331
ws .status = Status .closed
@@ -5734,23 +5654,6 @@ def validate_state(self, allow_overlap: bool = False) -> None:
5734
5654
if not (set (self .workers ) == set (self .stream_comms )):
5735
5655
raise ValueError ("Workers not the same in all collections" )
5736
5656
5737
- assert self .running .issuperset (self .idle .values ()), (
5738
- self .running .copy (),
5739
- set (self .idle .values ()),
5740
- )
5741
- assert self .running .issuperset (self .idle_task_count ), (
5742
- self .running .copy (),
5743
- self .idle_task_count .copy (),
5744
- )
5745
- assert self .running .issuperset (self .saturated ), (
5746
- self .running .copy (),
5747
- self .saturated .copy (),
5748
- )
5749
- assert self .saturated .isdisjoint (self .idle .values ()), (
5750
- self .saturated .copy (),
5751
- set (self .idle .values ()),
5752
- )
5753
-
5754
5657
task_prefix_counts : defaultdict [str , int ] = defaultdict (int )
5755
5658
for w , ws in self .workers .items ():
5756
5659
assert isinstance (w , str ), (type (w ), w )
@@ -5761,14 +5664,10 @@ def validate_state(self, allow_overlap: bool = False) -> None:
5761
5664
assert ws in self .running
5762
5665
else :
5763
5666
assert ws not in self .running
5764
- assert ws .address not in self .idle
5765
- assert ws not in self .saturated
5766
5667
5767
5668
assert ws .long_running .issubset (ws .processing )
5768
5669
if not ws .processing :
5769
5670
assert not ws .occupancy
5770
- if ws .status == Status .running :
5771
- assert ws .address in self .idle
5772
5671
assert not ws .needs_what .keys () & ws .has_what
5773
5672
actual_needs_what : defaultdict [TaskState , int ] = defaultdict (int )
5774
5673
for ts in ws .processing :
@@ -6031,7 +5930,6 @@ def handle_long_running(
6031
5930
ts .prefix .duration_average = (old_duration + compute_duration ) / 2
6032
5931
6033
5932
ws .add_to_long_running (ts )
6034
- self .check_idle_saturated (ws )
6035
5933
6036
5934
self .stimulus_queue_slots_maybe_opened (stimulus_id = stimulus_id )
6037
5935
@@ -6059,16 +5957,12 @@ def handle_worker_status_change(
6059
5957
6060
5958
if ws .status == Status .running :
6061
5959
self .running .add (ws )
6062
- self .check_idle_saturated (ws )
6063
5960
self .transitions (
6064
5961
self .bulk_schedule_unrunnable_after_adding_worker (ws ), stimulus_id
6065
5962
)
6066
5963
self .stimulus_queue_slots_maybe_opened (stimulus_id = stimulus_id )
6067
5964
else :
6068
5965
self .running .discard (ws )
6069
- self .idle .pop (ws .address , None )
6070
- self .idle_task_count .discard (ws )
6071
- self .saturated .discard (ws )
6072
5966
self ._refresh_no_workers_since ()
6073
5967
6074
5968
def handle_request_refresh_who_has (
0 commit comments