diff --git a/lib/async/node.rb b/lib/async/node.rb index 78d59cc7..bb287e9a 100644 --- a/lib/async/node.rb +++ b/lib/async/node.rb @@ -295,6 +295,12 @@ def stop(later = false) end end + # Wait for this node to complete. By default, nodes cannot be waited on. + # Subclasses like Task override this method to provide waiting functionality. + def wait + nil + end + # Whether the node has been stopped. def stopped? @children.nil? diff --git a/lib/async/task.rb b/lib/async/task.rb index 151913b8..80307138 100644 --- a/lib/async/task.rb +++ b/lib/async/task.rb @@ -284,11 +284,40 @@ def wait begin @promise.wait rescue Promise::Cancel - # For backward compatibility, stopped tasks return nil + # For backward compatibility, stopped tasks return nil: return nil end end + # Wait on all non-transient children to complete, recursively, then wait on the task itself, if it is not the current task. + # + # If any child task fails with an exception, that exception will be raised immediately, and remaining children may not be waited on. + # + # @example Waiting on all children. + # Async do |task| + # child = task.async do + # sleep(0.01) + # end + # task.wait_all # Will wait on the child task. + # end + # + # @raises [StandardError] If any child task failed with an exception, that exception will be raised. + # @returns [Object | Nil] The final expression/result of the task's block, or nil if called from within the task. + # @asynchronous This method is thread-safe. + def wait_all + @children&.each do |child| + # Skip transient tasks + next if child.transient? + + child.wait_all + end + + # Only wait on the task if we're not waiting on ourselves: + unless self.current? + return self.wait + end + end + # Access the result of the task without waiting. May be nil if the task is not completed. Does not raise exceptions. def result value = @promise.value diff --git a/releases.md b/releases.md index 6ddee2cf..ed68d25f 100644 --- a/releases.md +++ b/releases.md @@ -1,5 +1,9 @@ # Releases +## Unreleased + + - Introduce `Task#wait_all` which recursively waits for all children and self, excepting the current task. + ## v2.35.3 - `Async::Clock` now implements `#as_json` and `#to_json` for nicer log formatting. diff --git a/test/async/node.rb b/test/async/node.rb index 3c1f6de3..cc76f259 100644 --- a/test/async/node.rb +++ b/test/async/node.rb @@ -311,4 +311,11 @@ expect(node.traverse.to_a).to be == [[node, 0], [middle, 1], [child1, 2], [child2, 2]] end end + + with "#wait" do + it "returns self for a plain node" do + result = node.wait + expect(result).to be_nil + end + end end diff --git a/test/async/task.rb b/test/async/task.rb index 8c1ca8ed..01da342b 100644 --- a/test/async/task.rb +++ b/test/async/task.rb @@ -870,6 +870,159 @@ def sleep_forever end end + with "#wait_all" do + it "will wait on all child tasks to complete" do + results = [] + + reactor.run do |parent| + child1 = parent.async do |child| + child.yield + results << :child1 + end + + child2 = parent.async do |child| + child.yield + results << :child2 + end + + parent.wait_all + results << :parent + end + + expect(results).to be == [:child1, :child2, :parent] + end + + it "will wait recursively on nested child tasks" do + results = [] + + reactor.run do |parent| + child = parent.async do |child| + grandchild = child.async do |grandchild| + grandchild.yield + results << :grandchild + end + + child.yield + results << :child + end + + parent.wait_all + results << :parent + end + + expect(results).to be == [:grandchild, :child, :parent] + end + + it "will skip transient tasks" do + results = [] + + reactor.run do |parent| + child = parent.async do |child| + child.yield + results << :child + end + + transient = parent.async(transient: true) do + sleep(0.1) + results << :transient + end + + parent.wait_all + results << :parent + end + + # Transient task should not have completed + expect(results).to be == [:child, :parent] + end + + it "will handle tasks with no children" do + reactor.run do |parent| + result = parent.wait_all + expect(result).to be_nil + end + end + + it "will wait on multiple levels of nesting" do + results = [] + + reactor.run do |parent| + child1 = parent.async do |child| + grandchild1 = child.async do |grandchild| + grandchild.yield + results << :grandchild1 + end + child.yield + results << :child1 + end + + child2 = parent.async do |child| + grandchild2 = child.async do |grandchild| + grandchild.yield + results << :grandchild2 + end + child.yield + results << :child2 + end + + parent.wait_all + results << :parent + end + + # All tasks should complete in order + expect(results).to be(:include?, :grandchild1) + expect(results).to be(:include?, :grandchild2) + expect(results).to be(:include?, :child1) + expect(results).to be(:include?, :child2) + expect(results.last).to be == :parent + end + + it "returns nil when called from within the task" do + reactor.run do |parent| + child = parent.async do |child| + child.yield + end + + result = parent.wait_all + expect(result).to be_nil + end + end + + it "returns the task result when called from outside" do + parent = reactor.async do |parent| + child = parent.async do |child| + child.yield + end + :result + end + + reactor.run + + result = parent.wait_all + expect(result).to be == :result + end + + it "will propagate exceptions from child tasks" do + failed_child = nil + + reactor.run do |parent| + failed_child = parent.async(finished: false) do |child| + child.yield + raise RuntimeError, "child task failed" + end + + # Wait for the failed child to fail first + reactor.run + + expect(failed_child).to be(:finished?) + + # wait_all should propagate the exception when it calls wait on the failed child + expect do + parent.wait_all + end.to raise_exception(RuntimeError, message: be =~ /child task failed/) + end + end + end + with "#result" do it "does not raise exception" do task = reactor.async do |task|