Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions lib/async/node.rb
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,12 @@ def stop(later = false)
end
end

# Wait for this node to complete. By default, nodes cannot be waited on.
# Subclasses like Task override this method to provide waiting functionality.
def wait
nil
end

# Whether the node has been stopped.
def stopped?
@children.nil?
Expand Down
31 changes: 30 additions & 1 deletion lib/async/task.rb
Original file line number Diff line number Diff line change
Expand Up @@ -284,11 +284,40 @@ def wait
begin
@promise.wait
rescue Promise::Cancel
# For backward compatibility, stopped tasks return nil
# For backward compatibility, stopped tasks return nil:
return nil
end
end

# Wait on all non-transient children to complete, recursively, then wait on the task itself, if it is not the current task.
#
# If any child task fails with an exception, that exception will be raised immediately, and remaining children may not be waited on.
#
# @example Waiting on all children.
# Async do |task|
# child = task.async do
# sleep(0.01)
# end
# task.wait_all # Will wait on the child task.
# end
#
# @raises [StandardError] If any child task failed with an exception, that exception will be raised.
# @returns [Object | Nil] The final expression/result of the task's block, or nil if called from within the task.
# @asynchronous This method is thread-safe.
def wait_all
@children&.each do |child|
# Skip transient tasks
next if child.transient?

child.wait_all
end

# Only wait on the task if we're not waiting on ourselves:
unless self.current?
return self.wait
end
end

# Access the result of the task without waiting. May be nil if the task is not completed. Does not raise exceptions.
def result
value = @promise.value
Expand Down
4 changes: 4 additions & 0 deletions releases.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Releases

## Unreleased

- Introduce `Task#wait_all` which recursively waits for all children and self, excepting the current task.

## v2.35.3

- `Async::Clock` now implements `#as_json` and `#to_json` for nicer log formatting.
Expand Down
7 changes: 7 additions & 0 deletions test/async/node.rb
Original file line number Diff line number Diff line change
Expand Up @@ -311,4 +311,11 @@
expect(node.traverse.to_a).to be == [[node, 0], [middle, 1], [child1, 2], [child2, 2]]
end
end

with "#wait" do
it "returns self for a plain node" do
result = node.wait
expect(result).to be_nil
end
end
end
153 changes: 153 additions & 0 deletions test/async/task.rb
Original file line number Diff line number Diff line change
Expand Up @@ -870,6 +870,159 @@ def sleep_forever
end
end

with "#wait_all" do
it "will wait on all child tasks to complete" do
results = []

reactor.run do |parent|
child1 = parent.async do |child|
child.yield
results << :child1
end

child2 = parent.async do |child|
child.yield
results << :child2
end

parent.wait_all
results << :parent
end

expect(results).to be == [:child1, :child2, :parent]
end

it "will wait recursively on nested child tasks" do
results = []

reactor.run do |parent|
child = parent.async do |child|
grandchild = child.async do |grandchild|
grandchild.yield
results << :grandchild
end

child.yield
results << :child
end

parent.wait_all
results << :parent
end

expect(results).to be == [:grandchild, :child, :parent]
end

it "will skip transient tasks" do
results = []

reactor.run do |parent|
child = parent.async do |child|
child.yield
results << :child
end

transient = parent.async(transient: true) do
sleep(0.1)
results << :transient
end

parent.wait_all
results << :parent
end

# Transient task should not have completed
expect(results).to be == [:child, :parent]
end

it "will handle tasks with no children" do
reactor.run do |parent|
result = parent.wait_all
expect(result).to be_nil
end
end

it "will wait on multiple levels of nesting" do
results = []

reactor.run do |parent|
child1 = parent.async do |child|
grandchild1 = child.async do |grandchild|
grandchild.yield
results << :grandchild1
end
child.yield
results << :child1
end

child2 = parent.async do |child|
grandchild2 = child.async do |grandchild|
grandchild.yield
results << :grandchild2
end
child.yield
results << :child2
end

parent.wait_all
results << :parent
end

# All tasks should complete in order
expect(results).to be(:include?, :grandchild1)
expect(results).to be(:include?, :grandchild2)
expect(results).to be(:include?, :child1)
expect(results).to be(:include?, :child2)
expect(results.last).to be == :parent
end

it "returns nil when called from within the task" do
reactor.run do |parent|
child = parent.async do |child|
child.yield
end

result = parent.wait_all
expect(result).to be_nil
end
end

it "returns the task result when called from outside" do
parent = reactor.async do |parent|
child = parent.async do |child|
child.yield
end
:result
end

reactor.run

result = parent.wait_all
expect(result).to be == :result
end

it "will propagate exceptions from child tasks" do
failed_child = nil

reactor.run do |parent|
failed_child = parent.async(finished: false) do |child|
child.yield
raise RuntimeError, "child task failed"
end

# Wait for the failed child to fail first
reactor.run

expect(failed_child).to be(:finished?)

# wait_all should propagate the exception when it calls wait on the failed child
expect do
parent.wait_all
end.to raise_exception(RuntimeError, message: be =~ /child task failed/)
end
end
end

with "#result" do
it "does not raise exception" do
task = reactor.async do |task|
Expand Down
Loading