10000 Remove idle and saturated sets from scheduler · dask/distributed@393ee21 · GitHub
[go: up one dir, main page]

Skip to content

Commit 393ee21

Browse files
committed
Remove idle and saturated sets from scheduler
1 parent 004fafb commit 393ee21

File tree

4 files changed

+82
-193
lines changed

4 files changed

+82
-193
lines changed

distributed/scheduler.py

Lines changed: 7 additions & 113 deletions
Original file line numberDiff line numberDiff line change
@@ -1764,11 +1764,8 @@ def __init__(
17641764
self.clients["fire-and-forget"] = ClientState("fire-and-forget")
17651765
self.extensions = {}
17661766
self.host_info = host_info
1767-
self.idle = SortedDict()
1768-
self.idle_task_count = set()
17691767
self.n_tasks = 0
17701768
self.resources = resources
1771-
self.saturated = set()
17721769
self.tasks = tasks
17731770
self.replicated_tasks = {
17741771
ts for ts in self.tasks.values() if len(ts.who_has or ()) > 1
@@ -1852,7 +1849,6 @@ def __pdict__(self) -> dict[str, Any]:
18521849
return {
18531850
"bandwidth": self.bandwidth,
18541851
"resources": self.resources,
1855-
"saturated": self.saturated,
18561852
"unrunnable": self.unrunnable,
18571853
"queued": self.queued,
18581854
"n_tasks": self.n_tasks,
@@ -1867,7 +1863,6 @@ def __pdict__(self) -> dict[str, Any]:
18671863
"extensions": self.extensions,
18681864
"clients": self.clients,
18691865
"workers": self.workers,
1870-
"idle": self.idle,
18711866
"host_info": self.host_info,
18721867
}
18731868

@@ -2282,7 +2277,7 @@ def decide_worker_rootish_queuing_disabled(
22822277
# See root-ish-ness note below in `decide_worker_rootish_queuing_enabled`
22832278
assert math.isinf(self.WORKER_SATURATION) or not ts._queueable
22842279

2285-
pool = self.idle.values() if self.idle else self.running
2280+
pool = self.running
22862281
if not pool:
22872282
return None
22882283

@@ -2347,22 +2342,16 @@ def decide_worker_rootish_queuing_enabled(self) -> WorkerState | None:
23472342
# then add that assertion here (and actually pass in the task).
23482343
assert not math.isinf(self.WORKER_SATURATION)
23492344

2350-
if not self.idle_task_count:
2351-
# All workers busy? Task gets/stays queued.
2345+
if not self.running:
23522346
return None
23532347

23542348
# Just pick the least busy worker.
23552349
# NOTE: this will lead to worst-case scheduling with regards to co-assignment.
2356-
ws = min(
2357-
self.idle_task_count,
2358-
key=lambda ws: len(ws.processing) / ws.nthreads,
2359-
)
2350+
ws = min(self.running, key=lambda ws: len(ws.processing) / ws.nthreads)
2351+
if _worker_full(ws, self.WORKER_SATURATION):
2352+
return None
23602353
if self.validate:
23612354
assert self.workers.get(ws.address) is ws
2362-
assert not _worker_full(ws, self.WORKER_SATURATION), (
2363-
ws,
2364-
_task_slots_available(ws, self.WORKER_SATURATION),
2365-
)
23662355
assert ws in self.running, (ws, self.running)
23672356

23682357
return ws
@@ -2406,7 +2395,7 @@ def decide_worker_non_rootish(self, ts: TaskState) -> WorkerState | None:
24062395
# dependencies, but its group is also smaller than the cluster.
24072396

24082397
# Fastpath when there are no related tasks or restrictions
2409-
worker_pool = self.idle or self.workers
2398+
worker_pool = self.workers
24102399
# FIXME idle and workers are SortedDict's declared as dicts
24112400
# because sortedcontainers is not annotated
24122401
wp_vals = cast("Sequence[WorkerState]", worker_pool.values())
@@ -2906,7 +2895,6 @@ def _transition_waiting_queued(self, key: Key, stimulus_id: str) -> RecsMsgs:
29062895
ts = self.tasks[key]
29072896

29082897
if self.validate:
2909-
assert not self.idle_task_count, (ts, self.idle_task_count)
29102898
self._validate_ready(ts)
29112899

29122900
ts.state = "queued"
@@ -3094,63 +3082,6 @@ def is_rootish(self, ts: TaskState) -> bool:
30943082
and sum(map(len, tg.dependencies)) < 5
30953083
)
30963084

3097-
def check_idle_saturated(self, ws: WorkerState, occ: float = -1.0) -> None:
3098-
"""Update the status of the idle and saturated state
3099-
3100-
The scheduler keeps track of workers that are ..
3101-
3102-
- Saturated: have enough work to stay busy
3103-
- Idle: do not have enough work to stay busy
3104-
3105-
They are considered saturated if they both have enough tasks to occupy
3106-
all of their threads, and if the expected runtime of those tasks is
3107-
large enough.
3108-
3109-
If ``distributed.scheduler.worker-saturation`` is not ``inf``
3110-
(scheduler-side queuing is enabled), they are considered idle
3111-
if they have fewer tasks processing than the ``worker-saturation``
3112-
threshold dictates.
3113-
3114-
Otherwise, they are considered idle if they have fewer tasks processing
3115-
than threads, or if their tasks' total expected runtime is less than half
3116-
the expected runtime of the same number of average tasks.
3117-
3118-
This is useful for load balancing and adaptivity.
3119-
"""
3120-
if self.total_nthreads == 0 or ws.status == Status.closed:
3121-
return
3122-
if occ < 0:
3123-
occ = ws.occupancy
3124-
3125-
p = len(ws.processing)
3126-
3127-
self.saturated.discard(ws)
3128-
if ws.status != Status.running:
3129-
self.idle.pop(ws.address, None)
3130-
elif self.is_unoccupied(ws, occ, p):
3131-
self.idle[ws.address] = ws
3132-
else:
3133-
self.idle.pop(ws.address, None)
3134-
nc = ws.nthreads
3135-
if p > nc:
3136-
pending = occ * (p - nc) / (p * nc)
3137-
if 0.4 < pending > 1.9 * (self.total_occupancy / self.total_nthreads):
3138-
self.saturated.add(ws)
3139-
3140-
if not _worker_full(ws, self.WORKER_SATURATION) and ws.status == Status.running:
3141-
self.idle_task_count.add(ws)
3142-
else:
3143-
self.idle_task_count.discard(ws)
3144-
3145-
def is_unoccupied(
3146-
self, ws: WorkerState, occupancy: float, nprocessing: int
3147-
) -> bool:
3148-
nthreads = ws.nthreads
3149-
return (
3150-
nprocessing < nthreads
3151-
or occupancy < nthreads * (self.total_occupancy / self.total_nthreads) / 2
3152-
)
3153-
31543085
def get_comm_cost(self, ts: TaskState, ws: WorkerState) -> float:
31553086
"""
31563087
Get the estimated communication cost (in s.) to compute the task
@@ -3357,7 +3288,6 @@ def _add_to_processing(
33573288
ts.processing_on = ws
33583289
ts.state = "processing"
33593290
self.acquire_resources(ts, ws)
3360-
self.check_idle_saturated(ws)
33613291
self.n_tasks += 1
33623292

33633293
if ts.actor:
@@ -3423,7 +3353,6 @@ def _exit_processing_common(self, ts: TaskState) -> WorkerState | None:
34233353
if self.workers.get(ws.address) is not ws: # may have been removed
34243354
return None
34253355

3426-
self.check_idle_saturated(ws)
34273356
self.release_resources(ts, ws)
34283357

34293358
return ws
@@ -4547,10 +4476,6 @@ async def add_worker(
45474476
metrics=metrics,
45484477
)
45494478

4550-
# Do not need to adjust self.total_occupancy as self.occupancy[ws] cannot
4551-
# exist before this.
4552-
self.check_idle_saturated(ws)
4553-
45544479
self.stream_comms[address] = BatchedSend(interval="5ms", loop=self.loop)
45554480

45564481
awaitables = []
@@ -5167,13 +5092,11 @@ def stimulus_queue_slots_maybe_opened(self, *, stimulus_id: str) -> None:
51675092
so any tasks that became runnable are already in ``processing``. Otherwise,
51685093
overproduction can occur if queued tasks get scheduled before downstream tasks.
51695094
5170-
Must be called after `check_idle_saturated`; i.e. `idle_task_count` must be up to date.
51715095
"""
51725096
if not self.queued:
51735097
return
51745098
slots_available = sum(
5175-
_task_slots_available(ws, self.WORKER_SATURATION)
5176-
for ws in self.idle_task_count
5099+
_task_slots_available(ws, self.WORKER_SATURATION) for ws in self.running
51775100
)
51785101
if slots_available == 0:
51795102
return
@@ -5403,9 +5326,6 @@ async def remove_worker(
54035326
self.rpc.remove(address)
54045327
del self.stream_comms[address]
54055328
del self.aliases[ws.name]
5406-
self.idle.pop(ws.address, None)
5407-
self.idle_task_count.discard(ws)
5408-
self.saturated.discard(ws)
54095329
del self.workers[address]
54105330
self._workers_removed_total += 1
54115331
ws.status = Status.closed
@@ -5734,23 +5654,6 @@ def validate_state(self, allow_overlap: bool = False) -> None:
57345654
if not (set(self.workers) == set(self.stream_comms)):
57355655
raise ValueError("Workers not the same in all collections")
57365656

5737-
assert self.running.issuperset(self.idle.values()), (
5738-
self.running.copy(),
5739-
set(self.idle.values()),
5740-
)
5741-
assert self.running.issuperset(self.idle_task_count), (
5742-
self.running.copy(),
5743-
self.idle_task_count.copy(),
5744-
)
5745-
assert self.running.issuperset(self.saturated), (
5746-
self.running.copy(),
5747-
self.saturated.copy(),
5748-
)
5749-
assert self.saturated.isdisjoint(self.idle.values()), (
5750-
self.saturated.copy(),
5751-
set(self.idle.values()),
5752-
)
5753-
57545657
task_prefix_counts: defaultdict[str, int] = defaultdict(int)
57555658
for w, ws in self.workers.items():
57565659
assert isinstance(w, str), (type(w), w)
@@ -5761,14 +5664,10 @@ def validate_state(self, allow_overlap: bool = False) -> None:
57615664
assert ws in self.running
57625665
else:
57635666
assert ws not in self.running
5764-
assert ws.address not in self.idle
5765-
assert ws not in self.saturated
57665667

57675668
assert ws.long_running.issubset(ws.processing)
57685669
if not ws.processing:
57695670
assert not ws.occupancy
5770-
if ws.status == Status.running:
5771-
assert ws.address in self.idle
57725671
assert not ws.needs_what.keys() & ws.has_what
57735672
actual_needs_what: defaultdict[TaskState, int] = defaultdict(int)
57745673
for ts in ws.processing:
@@ -6031,7 +5930,6 @@ def handle_long_running(
60315930
ts.prefix.duration_average = (old_duration + compute_duration) / 2
60325931

60335932
ws.add_to_long_running(ts)
6034-
self.check_idle_saturated(ws)
60355933

60365934
self.stimulus_queue_slots_maybe_opened(stimulus_id=stimulus_id)
60375935

@@ -6059,16 +5957,12 @@ def handle_worker_status_change(
60595957

60605958
if ws.status == Status.running:
60615959
self.running.add(ws)
6062-
self.check_idle_saturated(ws)
60635960
self.transitions(
60645961
self.bulk_schedule_unrunnable_after_adding_worker(ws), stimulus_id
60655962
)
60665963
self.stimulus_queue_slots_maybe_opened(stimulus_id=stimulus_id)
60675964
else:
60685965
self.running.discard(ws)
6069-
self.idle.pop(ws.address, None)
6070-
self.idle_task_count.discard(ws)
6071-
self.saturated.discard(ws)
60725966
self._refresh_no_workers_since()
60735967

60745968
def handle_request_refresh_who_has(

0 commit comments

Comments
 (0)
0