8000 Work pool for (more) efficient handling of `blocking_operation_wait`.… · socketry/async@9449e6f · GitHub
[go: up one dir, main page]

Skip to content

Commit 9449e6f

Browse files
authored
Work pool for (more) efficient handling of blocking_operation_wait. (#359)
1 parent 34464e5 commit 9449e6f

File tree

3 files changed

+228
-10
lines changed

3 files changed

+228
-10
lines changed

lib/async/scheduler.rb

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
require_relative "clock"
99
require_relative "task"
10+
require_relative "worker_pool"
1011

1112
require "io/event"
1213

@@ -49,6 +50,7 @@ def initialize(parent = nil, selector: nil)
4950
@idle_time = 0.0
5051

5152
@timers = ::IO::Event::Timers.new
53+
@worker_pool = WorkerPool.new
5254
end
5355

5456
# Compute the scheduler load according to the busy and idle times that are updated by the run loop.
@@ -112,6 +114,11 @@ def close
112114

113115
selector&.close
114116

117+
worker_pool = @worker_pool
118+
@worker_pool = nil
119+
120+
worker_pool&.close
121+
115122
consume
116123
end
117124

@@ -169,8 +176,11 @@ def resume(fiber, *arguments)
169176

170177
# Invoked when a fiber tries to perform a blocking operation which cannot continue. A corresponding call {unblock} must be performed to allow this fiber to continue.
171178
#
172-
179+
# @public Since *Async v2*.
173180
# @asynchronous May only be called on same thread as fiber scheduler.
181+
#
182+
# @parameter blocker [Object] The object that is blocking the fiber.
183+
# @parameter timeout [Float | Nil] The maximum time to block, or if nil, indefinitely.
174184
def block(blocker, timeout)
175185
# $stderr.puts "block(#{blocker}, #{Fiber.current}, #{timeout})"
176186
fiber = Fiber.current
@@ -346,15 +356,7 @@ def process_wait(pid, flags)
346356
# @parameter work [Proc] The work to execute on a background thread.
347357
# @returns [Object] The result of the work.
348358
def blocking_operation_wait(work)
349-
thread = Thread.new(&work)
350-
351-
result = thread.join
352-
353-
thread = nil
354-
355-
return result
356-
ensure
357-
thread&.kill
359+
@worker_pool.call(work)
358360
end
359361

360362
# Run one iteration of the event loop.

lib/async/worker_pool.rb

Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
# frozen_string_literal: true
2+
3+
# Released under the MIT License.
4+
# Copyright, 2024, by Samuel Williams.
5+
6+
require "etc"
7+
8+
module Async
9+
# A simple work pool that offloads work to a background thread.
10+
#
11+
# @private
12+
class WorkerPool
13+
class Promise
14+
def initialize(work)
15+
@work = work
16+
@state = :pending
17+
@value = nil
18+
@guard = ::Mutex.new
19+
@condition = ::ConditionVariable.new
20+
@thread = nil
21+
end
22+
23+
def call
24+
work = nil
25+
26+
@guard.synchronize do
27+
@thread = ::Thread.current
28+
29+
return unless work = @work
30+
end
31+
32+
resolve(work.call)
33+
rescue Exception => error
34+
reject(error)
35+
end
36+
37+
private def resolve(value)
38+
@guard.synchronize do
39+
@work = nil
40+
@thread = nil
41+
@value = value
42+
@state = :resolved
43+
@condition.broadcast
44+
end
45+
end
46+
47+
private def reject(error)
48+
@guard.synchronize do
49+
@work = nil
50+
@thread = nil
51+
@value = error
52+
@state = :failed
53+
@condition.broadcast
54+
end
55+
end
56+
57+
def cancel
58+
return unless @work
59+
60+
@guard.synchronize do
61+
@work = nil
62+
@state = :cancelled
63+
@thread&.raise(Interrupt)
64+
end
65+
end
66+
67+
def wait
68+
@guard.synchronize do
69+
while @state == :pending
70+
@condition.wait(@guard)
71+
end
72+
73+
if @state == :failed
74+
raise @value
75+
else
76+
return @value
77+
end
78+
end
79+
end
80+
end
81+
82+
# A handle to the work being done.
83+
class Worker
84+
def initialize
85+
@work = ::Thread::Queue.new
86+
@thread = ::Thread.new(&method(:run))
87+
end
88+
89+
def run
90+
while work = @work.pop
91+
work.call
92+
end
93+
end
94+
95+
def close
96+
if thread = @thread
97+
@thread = nil
98+
thread.kill
99+
end
100+
end
101+
102+
# Call the work and notify the scheduler when it is done.
103+
def call(work)
104+
promise = Promise.new(work)
105+
106+
@work.push(promise)
107+
108+
begin
109+
return promise.wait
110+
ensure
111+
promise.cancel
112+
end
113+
end
114+
end
115+
116+
# Create a new work pool.
117+
#
118+
# @parameter size [Integer] The number of threads to use.
119+
def initialize(size: Etc.nprocessors)
120+
@ready = ::Thread::Queue.new
121+
122+
size.times do
123+
@ready.push(Worker.new)
124+
end
125+
end
126+
127+
# Close the work pool. Kills all outstanding work.
128+
def close
129+
if ready = @ready
130+
@ready = nil
131+
ready.close
132+
133+
while worker = ready.pop
134+
worker.close
135+
end
136+
end
137+
end
138+
139+
# Offload work to a thread.
140+
#
141+
# @parameter work [Proc] The work to be done.
142+
def call(work)
143+
if ready = @ready
144+
worker = ready.pop
145+
146+
begin
147+
worker.call(work)
148+
ensure
149+
ready.push(worker)
150+
end
151+
else
152+
raise RuntimeError, "No worker available!"
153+
end
154+
end
155+
end
156+
end

test/async/worker_pool.rb

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
# frozen_string_literal: true
2+
3+
# Released under the MIT License.
4+
# Copyright, 2022-2024, by Samuel Williams.
5+
# Copyright, 2024, by Patrik Wenger.
6+
7+
require "async/worker_pool"
8+
require "sus/fixtures/async"
9+
10+
describe Async::WorkerPool do
11+
let(:worker_pool) {subject.new(size: 1)}
12+
13+
it "offloads work to a thread" do
14+
result = worker_pool.call(proc do
15+
Thread.current
16+
end)
17+
18+
expect(result).not.to be == Thread.current
19+
end
20+
21+
it "gracefully handles errors" do
22+
expect do
23+
worker_pool.call(proc do
24+
raise ArgumentError, "Oops!"
25+
end)
26+
end.to raise_exception(ArgumentError, message: be == "Oops!")
27+
end
28+
29+
it "can cancel work" do
30+
sleeping = ::Thread::Queue.new
31+
32+
thread = Thread.new do
33+
Thread.current.report_on_exception = false
34+
35+
worker_pool.call(proc do
36+
sleeping.push(true)
37+
sleep(1)
38+
end)
39+
end
40+
41+
# Wait for the worker to start:
42+
sleeping.pop
43+
44+
thread.raise(Interrupt)
45+
46+
expect do
47+
thread.join
48+
end.to raise_exception(Interrupt)
49+
end
50+
51+
with "#close" do
52+
it "can be closed" do
53+
worker_pool.close
54+
55+
expect do
56+
worker_pool.call(proc{})
57+
end.to raise_exception(RuntimeError)
58+
end
59+
end
60+
end

0 commit comments

Comments
 (0)
0