8000 Better handling of bare `Async` and `Sync` blocks when a scheduler is… · socketry/async@b5494cc · GitHub
[go: up one dir, main page]

Skip to content

Commit b5494cc

Browse files
authored
Better handling of bare Async and Sync blocks when a scheduler is defined. (#340)
* Allow `Async` and `Sync` to be called in non-task contexts. * Ensure the scheduler is not leaked between tests.
1 parent fc6b5d7 commit b5494cc

File tree

8 files changed

+117
-30
lines changed

8 files changed

+117
-30
lines changed

lib/async/task.rb

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,13 @@ def self.yield
6767
Fiber.scheduler.transfer
6868
end
6969

70+
# Run the given block of code in a task, asynchronously, in the given scheduler.
71+
def self.run(scheduler, *arguments, **options, &block)
72+
self.new(scheduler, **options, &block).tap do |task|
73+
task.run(*arguments)
74+
end
75+
end
76+
7077
# Create a new task.
7178
# @parameter reactor [Reactor] the reactor this task will run within.
7279
# @parameter parent [Task] the parent task.

lib/kernel/async.rb

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ module Kernel
2424
def Async(...)
2525
if current = ::Async::Task.current?
2626
return current.async(...)
27+
elsif scheduler = Fiber.scheduler
28+
::Async::Task.run(scheduler, ...)
2729
else
2830
# This calls Fiber.set_scheduler(self):
2931
reactor = ::Async::Reactor.new

lib/kernel/sync.rb

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ module Kernel
1818
def Sync(&block)
1919
if task = ::Async::Task.current?
2020
yield task
21+
elsif scheduler = Fiber.scheduler
22+
::Async::Task.run(scheduler, &block).wait
2123
else
2224
# This calls Fiber.set_scheduler(self):
2325
reactor = Async::Reactor.new

readme.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,10 @@ Please see the [project documentation](https://socketry.github.io/async/) for mo
3535

3636
Please see the [project releases](https://socketry.github.io/async/releases/index) for all releases.
3737

38+
### Next
39+
40+
- [Better Handling of Async and Sync in Nested Fibers](https://socketry.github.io/async/releases/index#better-handling-of-async-and-sync-in-nested-fibers)
41+
3842
## See Also
3943

4044
- [async-http](https://github.com/socketry/async-http) — Asynchronous HTTP client/server.

releases.md

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
# Changes
2+
3+
## Next
4+
5+
### Better Handling of Async and Sync in Nested Fibers
6+
7+
Interleaving bare fibers within `Async` and `Sync` blocks should not cause problems, but it presents a number of issues in the current implementation. Tracking the parent-child relationship between tasks, when they are interleaved with bare fibers, is difficult. The current implementation assumes that if there is no parent task, then it should create a new reactor. This is not always the case, as the parent task might not be visible due to nested Fibers. As a result, `Async` will create a new reactor, trying to stop the existing one, causing major internal consistency issues.
8+
9+
I encountered this issue when trying to use `Async` within a streaming response in Rails. The `protocol-rack` [uses a normal fiber to wrap streaming responses](https://github.com/socketry/protocol-rack/blob/cb1ca44e9deadb9369bdb2ea03416556aa927c5c/lib/protocol/rack/body/streaming.rb#L24-L28), and if you try to use `Async` within it, it will create a new reactor, causing the server to lock up.
10+
11+
Ideally, `Async` and `Sync` helpers should work when any `Fiber.scheduler` is defined. Right now, it's unrealistic to expect `Async::Task` to work in any scheduler, but at the very least, the following should work:
12+
13+
```ruby
14+
reactor = Async::Reactor.new # internally calls Fiber.set_scheduler
15+
16+
# This should run in the above reactor, rather than creating a new one.
17+
Async do
18+
puts "Hello World"
19+
end
20+
```
21+
22+
In order to do this, bare `Async` and `Sync` blocks should use `Fiber.scheduler` as a parent if possible.
23+
24+
See <https://github.com/socketry/async/pull/340> for more details.

test/async/reactor.rb

Lines changed: 56 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,10 @@
1010

1111
describe Async::Reactor do
1212
let(:reactor) {subject.new}
13+
14+
after do
15+
Fiber.set_scheduler(nil)
16+
end
1317

1418
with '#run' do
1519
it "can run tasks on different fibers" do
@@ -218,24 +222,6 @@
218222
expect(result).to be_a(Async::Task)
219223
end
220224

221-
with '#async' do
222-
include Sus::Fixtures::Async::ReactorContext
223-
224-
it "can pass in arguments" do
225-
reactor.async(:arg) do |task, arg|
226-
expect(arg).to be == :arg
227-
end.wait
228-
end
229-
230-
it "passes in the correct number of arguments" do
231-
reactor.async(:arg1, :arg2, :arg3) do |task, arg1, arg2, arg3|
232-
expect(arg1).to be == :arg1
233-
expect(arg2).to be == :arg2
234-
expect(arg3).to be == :arg3
235-
end.wait
236-
end
237-
end
238-
239225
with '#with_timeout' do
240226
let(:duration) {1}
241227

@@ -282,16 +268,57 @@
282268
end
283269
end
284270

285-
it "validates scheduler assignment" do
286-
# Assign the scheduler:
287-
reactor = self.reactor
288-
289-
# Close the previous scheduler:
290-
Async {}
291-
292-
expect do
293-
# The reactor is closed:
294-
reactor.async {}
295-
end.to raise_exception(Async::Scheduler::ClosedError)
271+
with 'Kernel.Async' do
272+
it "reuses existing scheduler" do
273+
# Assign the scheduler:
274+
reactor = self.reactor
275+
276+
# Re-use the previous scheduler:
277+
state = nil
278+
Async do
279+
state = :started
280+
end
281+
282+
reactor.run
283+
284+
expect(state).to be == :started
285+
end
286+
end
287+
288+
with 'Kernel.Sync' do
289+
it "reuses existing scheduler" do
290+
# Assign the scheduler:
291+
reactor = self.reactor
292+
293+
# Re-use the previous scheduler:
294+
state = nil
295+
Sync do |task|
296+
state = :started
297+
end
298+
299+
reactor.run
300+
301+
expect(state).to be == :started
302+
end
303+
end
304+
end
305+
306+
describe Async::Reactor do
307+
include Sus::Fixtures::Async::ReactorContext
308+
309+
with '#async' do
310+
it "can pass in arguments" do
311+
reactor.async(:arg) do |task, arg|
312+
expect(arg).to be == :arg
313+
end.wait
314+
end
315+
316+
it "passes in the correct number of arguments" do
317+
reactor.async(:arg1, :arg2, :arg3) do |task, arg1, arg2, arg3|
318+
expect(arg1).to be == :arg1
319+
expect(arg2).to be == :arg2
320+
expect(arg3).to be == :arg3
321+
end.wait
322+
end
296323
end
297324
end

test/async/task.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
let(:reactor) {Async::Reactor.new}
1818

1919
after do
20-
reactor.close
20+
Fiber.set_scheduler(nil)
2121
end
2222

2323
with '#annotate' do

test/fiber.rb

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,27 @@
3232

3333
expect(error).to be_a(Async::Stop)
3434
end
35+
36+
it "can nest child tasks within a resumed fiber" do
37+
skip_unless_minimum_ruby_version("3.3.4")
38+
39+
variable = Async::Variable.new
40+
error = nil
41+
42+
Sync do |task|
43+
child_task = task.async do
44+
Fiber.new do
45+
Async do
46+
variable.value
47+
end.wait
48+
end.resume
49+
end
50+
51+
expect(child_task).to be(:running?)
52+
53+
variable.value = true
54+
end
55+
end
3556
end
3657

3758
with '.schedule' do

0 commit comments

Comments
 (0)
0