8000 Add `Scheduler#load` and `Async::Idler` for scheduling tasks when idle. · socketry/async@4e586e7 · GitHub
[go: up one dir, main page]

Skip to content

Commit 4e586e7

Browse files
committed
Add Scheduler#load and Async::Idler for scheduling tasks when idle.
`Async::Idler` introduces a `maximum_load` and functions like a semaphore, in that it will schedule tasks until the maximum load is reached.
1 parent 13d3f47 commit 4e586e7

File tree

5 files changed

+137
-1
lines changed

5 files changed

+137
-1
lines changed

async.gemspec

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,6 @@ Gem::Specification.new do |spec|
2626

2727
spec.add_dependency "console", "~> 1.10"
2828
spec.add_dependency "fiber-annotation"
29-
spec.add_dependency "io-event", "~> 1.1"
29+
spec.add_dependency "io-event", "~> 1.5"
3030
spec.add_dependency "timers", "~> 4.1"
3131
end

examples/load/test.rb

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
#!/usr/bin/env ruby
2+
3+
require_relative '../../lib/async'
4+
require_relative '../../lib/async/idler'
5+
6+
Async do
7+
idler = Async::Idler.new(0.8)
8+
9+
Async do
10+
while true
11+
idler.async do
12+
$stdout.write '.'
13+
while true
14+
sleep 0.1
15+
end
16+
end
17+
end
18+
end
19+
20+
scheduler = Fiber.scheduler
21+
while true
22+
load = scheduler.load
23+
24+
$stdout.write "\nLoad: #{load} "
25+
sleep 1.0
26+
end
27+
end

lib/async/idler.rb

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
# frozen_string_literal: true
2+
3+
# Released under the MIT License.
4+
# Copyright, 2024, by Samuel Williams.
5+
6+
module Async
7+
class Idler
8+
def initialize(maximum_load = 0.8, backoff: 0.01, parent: nil)
9+
@maximum_load = maximum_load
10+
@backoff = backoff
11+
@parent = parent
12+
end
13+
14+
def async(*arguments, parent: (@parent or Task.current), **options, &block)
15+
wait
16+
17+
# It is crucial that we optimistically execute the child task, so that we prevent a tight loop invoking this method from consuming all available resources.
18+
parent.async(*arguments, **options, &block)
19+
end
20+
21+
def wait
22+
scheduler = Fiber.scheduler
23+
backoff = nil
24+
25+
while true
26+
load = scheduler.load
27+
break if load < @maximum_load
28+
29+
if backoff
30+
sleep(backoff)
31+
backoff *= 2.0
32+
else
33+
scheduler.yield
34+
backoff = @backoff
35+
end
36+
end
37+
end
38+
end
39+
end

lib/async/scheduler.rb

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,33 @@ def initialize(parent = nil, selector: nil)
3737

3838
@blocked = 0
3939

40+
@busy_time = 0.0
41+
@idle_time = 0.0
42+
4043
@timers = ::Timers::Group.new
4144
end
4245

46+
# Compute the scheduler load according to the busy and idle times that are updated by the run loop.
47+
# @returns [Float] The load of the scheduler. 0.0 means no load, 1.0 means fully loaded or over-loaded.
48+
def load
49+
total_time = @busy_time + @idle_time
50+
51+
# If the total time is zero, then the load is zero:
52+
return 0.0 if total_time.zero?
53+
54+
# We normalize to a 1 second window:
55+
if total_time > 1.0
56+
ratio = 1.0 / total_time
57+
@busy_time *= ratio
58+
@idle_time *= ratio
59+
60+
# We don't need to divide here as we've already normalised it to a 1s window:
61+
return @busy_time
62+
else
63+
return @busy_time / total_time
64+
end
65+
end
66+
4367
def scheduler_close
4468
# If the execution context (thread) was handling an exception, we want to exit as quickly as possible:
4569
unless $!
@@ -267,6 +291,8 @@ def run_once(timeout = nil)
267291
# @parameter timeout [Float | Nil] The maximum timeout, or if nil, indefinite.
268292
# @returns [Boolean] Whether there is more work to do.
269293
private def run_once!(timeout = 0)
294+
start_time = Async::Clock.now
295+
270296
interval = @timers.wait_interval
271297

272298
# If there is no interval to wait (thus no timers), and no tasks, we could be done:
@@ -288,6 +314,15 @@ def run_once(timeout = nil)
288314

289315
@timers.fire
290316

317+
# Compute load:
318+
end_time = Async::Clock.now
319+
total_duration = end_time - start_time
320+
idle_duration = @selector.idle_duration
321+
busy_duration = total_duration - idle_duration
322+
323+
@busy_time += busy_duration
324+
@idle_time += idle_duration
325+
291326
# The reactor still has work to do:
292327
return true
293328
end

test/async/idler.rb

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
# frozen_string_literal: true
2+
3+
# Released under the MIT License.
4+
# Copyright, 2018-2023, by Samuel Williams.
5+
6+
require 'async/idler'
7+
require 'sus/fixtures/async'
8+
9+
require 'chainable_async'
10+
11+
describe Async::Idler do
12+
include Sus::Fixtures::Async::ReactorContext
13+
let(:idler) {subject.new(0.5)}
14+
15+
it 'can schedule tasks up to the desired load' do
16+
# Generate the load:
17+
Async do
18+
while true
19+
idler.async do
20+
while true
21+
sleep 0.1
22+
end
23+
end
24+
end
25+
end
26+
27+
# This test must be longer than the test window...
28+
sleep 1.1
29+
30+
# Verify that the load is within the desired range:
31+
expect(Fiber.scheduler.load).to be_within(0.1).of(0.5)
32+
end
33+
34+
it_behaves_like ChainableAsync
35+
end

0 commit comments

Comments
 (0)
0