Skip to content

Conversation

@vineetbansal
Copy link

@vineetbansal vineetbansal commented Oct 23, 2025

Summary

Include a summary of major changes in bullet points:

I was talking to @Andrew-S-Rosen about implementing a @flow decorator for jobflow, and a previous discussion on this topic. Here's an initial stab at it. It's using a "flow context" (an idea also used in the Prefect library) so that a job can figure out if it's running inside a decorated flow function, and if so, add itself to the flow's job list. I believe this should help us get around the "updating global variables" issue as mentioned in a related PR in the linked discussion.

  • Added a @flow decorator
  • Added some tests, one for a dynamic run with a replacement.

Additional dependencies introduced (if any)

None

TODO (if any)

I have almost no history with using jobflow, so certainly I'm not thinking about all use cases, so any pointers (initially failing tests that I can add to cover different scenarios before tackling them) will be welcome.

Checklist

Work-in-progress pull requests are encouraged, but please put [WIP] in the pull request
title.

Before a pull request can be merged, the following items must be checked:

  • Code is in the standard Python style.
    The easiest way to handle this is to run the following in the correct sequence on
    your local machine. Start with running black on your new code. This will
    automatically reformat your code to PEP8 conventions and removes most issues. Then run
    pycodestyle, followed by flake8.
  • Docstrings have been added in theNumpy docstring format.
    Run pydocstyle on your code.
  • Type annotations are highly encouraged. Run mypy to
    type check your code.
  • Tests have been added for any new functionality or bug fixes.
  • All linting and tests pass.

Note that the CI system will run all the above checks. But it will be much more
efficient if you already fix most errors prior to submitting the PR. It is highly
recommended that you use the pre-commit hook provided in the repository. Simply
pip install pre-commit and then pre-commit install and a check will be run
prior to allowing commits.

@vineetbansal vineetbansal marked this pull request as draft October 23, 2025 18:40
@vineetbansal
Copy link
Author

@Andrew-S-Rosen, @davidwaroquiers , @utf

@utf utf marked this pull request as ready for review October 24, 2025 06:17
@Andrew-S-Rosen
Copy link
Member

Andrew-S-Rosen commented Oct 26, 2025

Also pinging @gpetretto in case he has any thoughts about what considerations might need to come into play with regards to getting this or a similar approach to work nicely by Jobflow-Remote.

@vineetbansal
Copy link
Author

vineetbansal commented Oct 27, 2025

At a high level, to break down what's happening:

  • The flow decorator sets a context variable, and returns a DecoratedFlow, a subclass of Flow, which saves the callable, args and kwargs.
  • A Job, when run, checks to see if its running in the context of a flow decorator, if so, it adds itself to the DecoratedFlow (much like the user composing a traditional Flow would have done explicitly).
  • run_locally runs the DecoratedFlow by calling it with its saved args/kwargs, and at the end, resolves any references in the output.
  • When resolving output references, some extra care is taken to resolve lists/dicts etc.
  • If the job has been replaced by another, the output of the replaced job is returned instead.
  • The latest run (as determined by job_index) of a job is used to determine what output will be returned for that job. This seems reasonable to me, but I'm making some assumptions here (job_indices always increase and only the latest one is of interest to the caller etc), so perhaps this could use a thorough review and more test cases.
  • The legacy behavior is retained (returning the entire response dictionary) for traditional Flow classes (i.e. not DecoratedFlow).

Parallel changes would have to be made in jobflow-remote if this is on the right path, of course.

I've added some more tests to cover what I think would be typical scenarios in using the @flow decorator:


@flow
def my_flow(a, b):
  # typical case
  return add(a, b).result  # or the inadvertent shorthand `return add(a, b)` which is handled by existing code correctly.

@flow
def my_flow():
   # some jobs are run, but only for their side effects
   return 42
   
@flow
def my_flow(a, b):
  # some complex structure with hashable keys but jobs/flows as values     
  return [add(a, b), add(b, a).output], {
      "foo": add(a, b).output,
      "bar": [add(a, b), subtract(a, b)],
  }

# replaced job in a flow
@job
def replace_job(x, y):
    another_job = subtract(x, y)
    return Response(replace=another_job)
    
@flow
def my_flow(a, b):
  return add(a, b).result

@gpetretto
Copy link
Contributor

Hi @vineetbansal and @Andrew-S-Rosen,
thanks a lot for implementing this. I have made some tests and here are my initial comments.

  • It is not clear why for a DecoratedFlow the output should be different from the standard case of a normal Flow. In the example below I would expect that the output of the two executions would be equivalent.
    @flow
    def my_flow(a, b):
        j= add(a, b)
        return j
    
    j = add(3,4)
    f = Flow(j, output=j.output)
    
    result = run_locally(f, ensure_success=True)
    result = run_locally(my_flow(3,4), ensure_success=True)
    This, however, is just concerning how run_locally generates the outputs
  • Just to clarify, I suppose there is no aim at executing any piece of code inside a flow decorated function, aside from combining jobs actions. For example the following code raises an error:
    @flow
    def my_flow(a, b):
        j1= add(a, b)
        x = j1.output + 5
        j2 = add(x, b)
        return j2.output
    I would not see any reasonable way of implementing this, especially for jobflow-remote or fireworks executions. Just for my undestanding, is something similar possible in prefect?
  • It seems that is not possible to nest multiple flow decorated functions. The following code does not execute any Job and returns None:
    @flow
    def add_single(a, b):
        j1 = add(a, b)
        return add(j1.output, 2)
    
    @flow
    def add_combine(a, b):
        f1 = add_single(a, b)
        return add_single(f1.output, 3)
    
    f = add_combine(1, 2)
    result = run_locally(f, ensure_success=True)
    Maybe here it is possible to change the flow decorator so that when the function is called can also check if it is inside the flow_build_context and trigger a recursive unravel of the flow functions? In general I would expect this to work more or less like the following code based on Makers:
    class AddSingleMaker(Maker):
        name = "single"
    
        def make(self, a, b):
            j1 = add(a, b)
            j2 = add(j1.output, 2)
            return Flow([j1, j2], output=j2.output)
    
    class AddCombineMaker(Maker):
        name = "combine"
    
        def make(self, a, b):
            f1 = AddSingleMaker().make(1, 2)
            f2 = AddSingleMaker().make(f1.output, 3)
            return Flow([f1, f2], output=f2.output)
    
    f = AddCombineMaker().make(1, 2)
    result = run_locally(f, ensure_success=True)
  • Concerning jobflow-remote, as things are now in the end I now think it would be relatively easy to integrate this. My understanding is that the flow decorated function should just be used to first generate the "complete" Flow object. I have tried the code below in jobflow-remote and it seems to be working as expected
    @flow
    def my_flow(a,b):
        j= add(a,b)
        return j
    
    print("create flow")
    f = my_flow(1, 2)
    
    with flow_build_context(f):
        output = f.fn(*f.args, **f.kwargs)
    
    f.name = f.fn.__name__
    f.output = output.output
    
    submit_flow(f, worker="local_shell")
    So I can basically include this check below in the submit_flow function and I would expect this to work:
    if isinstance(flow, DecoratedFlow):
      with flow_build_context(flow):
          output = flow.fn(*flow.args, **flow.kwargs)
      
      flow.name = flow.fn.__name__
      flow.output = output.output
    Maybe the setting of name and output can even be done directly by the flow decorator?
    If instead you expect that the code will evolve to include code execution as in the example above or other functionalities things may get more complicated.
  • If this solution works, I would expect that it may be applied in the same way for flow_to_workflow for Fireworks.

@vineetbansal
Copy link
Author

Thanks @gpetretto - your comments were most insightful and after a few iterations of overly complicating and then simplifying things on my end, I think I've ended up at more or less the same place as you had intuited.

It is not clear why for a DecoratedFlow the output should be different from the standard case of a normal Flow..

Indeed - my original intuition was that the @flow decorator could be used on arbitrary functions, not necessarily ones for accumulating Jobs and Flows, so I was trying to resolve the output (to an admittedly incomplete extent) before returning it. I see that all the resolving is happening in the responses dict anyway, so that simplifies things on my end (and gets rid of a few test cases related to this return behavior).

Just to clarify, I suppose there is no aim at executing any piece of code inside a flow decorated function, aside from combining jobs actions. .. I would not see any reasonable way of implementing this, especially for jobflow-remote or fireworks executions.

Yes - I talked to @Andrew-S-Rosen and we're not using @flow decorators (across workflow libraries) to execute code other than as a glue to create a DAG. So my initial (incomplete) attempt at trying to do this is not needed.

is something similar possible in prefect?

Yes - it seems to be. Prefect seems to be passing in the "manager" to the context to execute arbitrary code in the flow:

    with engine.start():
        while engine.is_running():
            with engine.run_context():
                engine.call_flow_fn()

It seems that is not possible to nest multiple flow decorated functions. .. Maybe here it is possible to change the flow decorator so that when the function is called can also check if it is inside the flow_build_context and trigger a recursive unravel of the flow functions?

This should now be fixed. If I understand correctly, the current DecoratedFlow should add itself to the context before "executing" (building the DAG for) the decorated function, and the DecoratedFlow should check if it is nested inside another (by checking the flow context) to add itself to the parent flow, which is what I'm doing now.

I've added a test case for this.

Maybe the setting of name and output can even be done directly by the flow decorator?

Good idea. Done!

As it stands now, Response(replace=..) and dynamic flows are not currently working with this approach, and represented by two xfail tests.

This seems to have to do with the creation of brand-new Flow objects, independent of existing flows, when replace is called:

replace = Flow(jobs=replace)

and

replace = Flow(jobs=replace, output=replace.output)

These 2 are the cases that we'd like to support in jobflow to be able to use it in our project. I'll dig into this some more, unless you have some thoughts already. I didn't want to delay this reply any more!

@gpetretto
Copy link
Contributor

Hi @vineetbansal, thanks a lot for the updates! I will have a look at them. I just wanted to quickly clarify one point: I tried to fetch your latest changes and run the tests in test_flow_decorator.py, but they all seem to pass on my laptop. Even those marked with xfail. Could there be some differences depending on the environment? Or am I missing something?

@vineetbansal
Copy link
Author

@gpetretto - no - you're right. Somewhere along the line during my refactoring the Response(replace=..) seems to have been fixed - I should have used strict=True for those xfails. I'm adding another commit that removes those xfails. Fingers crossed..

@vineetbansal
Copy link
Author

@gpetretto - when you're looking at this PR, I wanted to bring something else to your attention as well, as I was going through its possible integration with quacc. This is again going back to your question:

It is not clear why for a DecoratedFlow the output should be different from the standard case of a normal Flow..

In the case of classic flows, the caller who creates the Flow is responsible for reaching into the results depending on what they "know" the eventual output of interest is (presumably by how they put the jobs together):

  job1 = add(1, 2)
  job2 = mult(job1.output, 3)
  flow = jf.Flow([job1, job2])

  responses = run_locally(flow, ensure_success=True)
  assert responses[job2.uuid][1].output == 9

or, the same thing but during Flow construction time:

  flow = jf.Flow([job1, job2], output=job2.output)

  responses = run_locally(flow, ensure_success=True)
  assert responses[flow.output.uuid][1].output == 9

However, in the case of a @flow decorated function (which should be opaque to the caller except for the input and output), it's not clear what uuid in the results the caller should be reaching in for (since the jobs/flows are all internal details of the @flow). For it to "just work", the use case would look like:

  @flow
  def workflow():
      job1 = add(1, 2)
      job2 = mult(job1.output, 3)
      return job2.output  # or `return job`

  response = run_locally(workflow(), ensure_success=True)
  assert response == 9

The latter is the use case where we use jobflow in quacc recipes.

We have a small tweak to this PR here that allows us to do this. Perhaps we can incorporate something like that into this PR (either now or as a separate PR?)

The way I envision it working would be (in case you do want to preserve compatibility with Flow):

@flow
def workflow(...)
   # return a dict of responses, keyed by uuid, similar to Flow()
   
@flow(return_dict=False)
def workflow(...)
  # return the resolved output

This also allows us to support cases where the output is not a single OutputReference or Flow, but could easily be resolved anyway:

  @flow(return_dict=False)
  def my_flow(a, b):
      return [add(a, a), add(b, b), add(a, b)]

@gpetretto
Copy link
Contributor

gpetretto commented Nov 15, 2025

Hi @vineetbansal,

thanks for your additional comments.
I think that your comparison between the Flow and @flow is not entirely fair though. It seems to me that in the @flow example you are making it more complicated to access the results by passing the workflow() directly to run_locally. In fact you can again achieve the same behaviour as for the Flow example by simply assigning the workflow() to a variable:

@flow
def workflow():
    job1 = add(1, 2)
    job2 = mult(job1.output, 3)
    return job2.output  # or `return job`

f = workflow()
response = run_locally(f, ensure_success=True)
assert response[f.output.uuid][1].output == 9
assert response[f.jobs[1].uuid][1].output == 9

If you instead consider necessary to have the run_locally to return just the output of the whole Flow, instead of the list of responses, I think it would make more sense to have an option directly in run_locally and apply this independently from the fact that the input is a Flow or a DecoratedFlow.

--- a/src/jobflow/managers/local.py
+++ b/src/jobflow/managers/local.py
@@ -22,6 +22,7 @@ def run_locally(
     ensure_success: bool = False,
     allow_external_references: bool = False,
     raise_immediately: bool = False,
+    return_flow_result: bool = False,
 ) -> dict[str, dict[int, jobflow.Response]]:
     """
     Run a :obj:`Job` or :obj:`Flow` locally.
@@ -84,6 +85,9 @@ def run_locally(

     flow = get_flow(flow, allow_external_references=allow_external_references)

+    if return_flow_result and flow.output is None:
+        raise Value("The Flow should have an output if return_flow_result is True")
+
     stopped_parents: set[str] = set()
     errored: set[str] = set()
     responses: dict[str, dict[int, jobflow.Response]] = defaultdict(dict)
@@ -183,4 +187,7 @@ def run_locally(
     if ensure_success and not finished_successfully:
         raise RuntimeError("Flow did not finish running successfully")

+    if return_flow_result:
+        max_out_ind = max(responses[flow.output.uuid].keys())
+        return responses[flow.output.uuid][max_out_ind].output
     return dict(responses)

With these changes you can just run the same for both Flow and @flow:

job1 = add(1, 2)
job2 = mult(job1.output, 3)
f = Flow([job1, job2], output=job2.output)

responses = run_locally(f, ensure_success=True, return_flow_result=True)
assert responses == 9

@flow
def workflow():
    job1 = add(1, 2)
    job2 = mult(job1.output, 3)
    return job2.output  # or `return job`

f = workflow()
response = run_locally(f, ensure_success=True, return_flow_result=True)
assert responses == 9

I did not test this with more complicated workflows or indexes larger than 1, so maybe some adaptation might be needed.

Does this solve your issue? If not I think it would be better if you can link an example from quacc so that it is clearer where this is actually used.

Also, note that all these solutions will only work when executing the workflows with run_locally. In fireworks and jobflow-remote there is no "Flow output" in the DB, so the only option to get the final results is to use the job uuids. It might be interesting to add these, but the changes will be much bigger in that case.

Comments on the implementation

I also went through your last changes. I think it is a very good solution, as it may not require any change at all for jobflow-remote. I tried to submit a simple @flow and it worked directly. Some further investigation is needed to verify that everything works as expected, but it is definitely promising.
I have also found one more way to break the current implementation, but I may also have a solution.
Consider that, especially in atomate2 the Flow Makers are used very often. So it may be important to combine a @flow function with a Maker. However, currently the following example does not work:

@job
def add(a, b):
    return a + b

@dataclass
class AddFlowMaker(Maker):
    def make(self, a, b):
        j1 = add(a, b)
        j2 = add(j1.output, 2)
        return Flow([j1, j2])

@flow
def add_combine(a, b):
    j = add(a, b)
    f = AddFlowMaker().make(1, j.output)
    return f

f = add_combine(1, 2)

result = run_locally(f, ensure_success=True)

The reason is that in this case the code tries to assign the same Job to multiple flows as if it was its first flow addition. I have made a change that makes the code above run and seems to keep consistency with the expected behaviour, but more investigation might be needed here as well:

--- a/src/jobflow/core/flow.py
+++ b/src/jobflow/core/flow.py
@@ -827,10 +828,13 @@ class Flow(MSONable):
         job_ids = set(self.all_uuids)
         hosts = [self.uuid, *self.hosts]
         for job in jobs:
-            if job.host is not None and job.host != self.uuid:
+            flow_context = _current_flow_context.get()
+
+            if job.host is not None and job.host != self.uuid and (flow_context is not None and flow_context.uuid != job.host):
+            #if job.host is not None and job.host != self.uuid:
                 raise ValueError(
                     f"{type(job).__name__} {job.name} ({job.uuid}) already belongs "
                     f"to another flow."
                 )
             if job.uuid in job_ids:
                 raise ValueError(
@@ -845,7 +849,8 @@ class Flow(MSONable):
                 )
             job_ids.add(job.uuid)
             if job.host != self.uuid:
-                job.add_hosts_uuids(hosts)
+                prepend = flow_context is not None and flow_context.uuid == job.host
+                job.add_hosts_uuids(hosts, prepend=prepend)
         self._jobs += tuple(jobs)

     def remove_jobs(self, indices: int | list[int]):

Do you think this will be compatible in general with your solution?

Thanks for all the efforts!

@Andrew-S-Rosen
Copy link
Member

Just adding a note for @vineetbansal that the Maker class can be found in jobflow.core.maker.Maker. Additional context can be found here.

@gpetretto
Copy link
Contributor

Just adding a note for @vineetbansal that the Maker class can be found in jobflow.core.maker.Maker. Additional context can be found here.

Just one additional clarification here: the examples in the link are all Jobs Makers. I did not test them, but I suppose they will work, since basically it is just a convenient way for defining the Job, but it still uses the @job decorator. In the case of Flow Makers, they build and return a Flow object, which is the critial point that generates the error in the code above. The same error happens if you explicitly define a Flow inside a @flow:

@flow
def add_combine(a, b):
    j = add(a, b)
    f1 = Flow(j, j.output)
    return add_single(f1.output, 3)

f = add_combine(1, 2)
result = run_locally(f, ensure_success=True)

This is basically equivalent to the code with the Maker. The Maker in the example of the previous post is just including the creation of the Flow object in its make method.
Small examples of Flow Makers are here or, more involved, in atomate

@vineetbansal vineetbansal changed the title [WIP] An initial stab at a flow decorator An initial stab at a flow decorator Nov 15, 2025
@vineetbansal vineetbansal changed the title An initial stab at a flow decorator A flow decorator Nov 15, 2025
@vineetbansal
Copy link
Author

vineetbansal commented Nov 17, 2025

Thanks @gpetretto - this all makes sense.

I think that your comparison between the Flow and @flow is not entirely fair though..

Fair point - this was a brain-freeze on my end! The f.output reach-in will of course still work, and should be fine for our use case.

Thanks also for the changes you suggest to run_locally to return results. I think they need not go in for right now - eventually quacc integrates with jobflow through jobflow-remote, and if jobflow-remote won't handle those, then we can just document how the final results can be obtained from the response dictionary (as we document it now for the Flow object).

I've added one last minor tweak in the latest commit - I've removed the insistence on the output of the @flow decorated function to be a Job/Flow/OutputReference:

-        elif not isinstance(output, OutputReference):
-            raise RuntimeError(
-                "A @flow decorated function must return a Job or an OutputReference"

Even if the return value of those functions are not returned directly but in a dict (which is fine), these @flows in quacc are often constructed such that they are returning lists of flows. The fact that the list is returned is immaterial - its only the proper instantiation of the Job/Flow objects that we eventually care about. A toy example of this in the tests is:

  @flow
  def my_flow(a, b):
      return [add(a, a), add(b, b)]

  f = my_flow(1, 2)
  _ = run_locally(f, ensure_success=True)

In these cases, DecoratedFlow will still construct the correct DAG, and jobflow-remote need not return the flow output, so hopefully this will be okay.

For the other failure mode that you found, I've added an xfail test for that - test_flow_returns_flow. I didn't want to make the suggested changes to the code directly, because I don't totally understand the order of these uuids and why they have to be prepended, but also because with this change, some of the current tests that are expected to raise Exceptions no longer do so. For example:

    add_job = get_test_job()
    Flow([add_job])
    with pytest.raises(ValueError, match="already belongs to another flow"):
        Flow([add_job])

with pytest.raises(ValueError, match="already belongs to another flow"):

Handling this failure mode is not a blocker for us at this stage, but let me know if you'd rather get it in with this PR, and I can investigate and incorporate it as well. However, with this latest commit, I think we have all we need to use this flow decorator in quacc.

@gpetretto
Copy link
Contributor

Hi @vineetbansal,
thanks for the updates and sorry for the delay. I spent some time on the point of using the flow decorator with Makers, because I think this is a potentially important use case. Let me address the different points separately.

Return objects

In the example that you proposed:

  @flow
  def my_flow(a, b):
      return [add(a, a), add(b, b)]

  f = my_flow(1, 2)
  _ = run_locally(f, ensure_success=True)

Is this the correct approach? Shouldn't the return be return [add(a, a).output, add(b, b).output]? Otherwise, if this is combined with other jobs/flows will lead to failures:

@flow
def add_single_list(a, b):
    return [add(a, 2), add(b, 3)]

@flow
def add_combine(a, b):
    f1 = add_single_list(a, b)
    return add_single_list(f1.output[0], 3)

f = add_combine(1, 2)
result = run_locally(f, ensure_success=True)

Instead it works if the return of the first @flow is return [add(a, 2).output, add(b, 3).output].
This part thus behaves a bit inconsistently:
https://github.com/vineetbansal/jobflow/blob/907944bad66c73d08d721a9072285be21429d935/src/jobflow/core/flow.py#L951-L960
It warns of one potential issue, but then "fixes" it for the user without specifying it. But then for the case above one still gets the warning from here: https://github.com/vineetbansal/jobflow/blob/907944bad66c73d08d721a9072285be21429d935/src/jobflow/core/flow.py#L283
but in this case the problem is not fixed automatically.
Since handling all the possible cases, replacing each Job/Flow with their outputs, would be difficult, I think maybe it would be possible to always leave the output as is and rely on the already existing warning if any Job/Flow is present in any form in the output. However, I do not have a strong opinion on this matter. If the replacing of the output is left as is, I would at least suggest to explicitly mention in the warning message that the output has been replaced.

Combining with makers

Sorry, my previous condition was partially incorrect. I found a better one but this still left inconsitencies in the order of the hosts. The reason for the initial prepend that I proposed was related to how the hosts attribute is defined:
https://github.com/vineetbansal/jobflow/blob/907944bad66c73d08d721a9072285be21429d935/src/jobflow/core/job.py#L273
But I tested with more involved cases and I have verified that it did not handle all of them correctly.

So, I took the liberty of making a somewhat larger change to the procedure in which the jobs are added to the decorated Flow. It seems that this could handle all the previous cases, plus all possible usage of nested Flow and Job objects inside them. I made some tests with some involved combinations and it seems that the execution and hosts order was correct in that case. However, it is difficult to make tests that involve all possible combinations, so a further check of the logic might be needed.

I have pushed it in this branch: https://github.com/gpetretto/jobflow/tree/test_flow_dec

The main change here is that instead of calling add_jobs when the Job is created, calling the function decorated with @flow first collects all the jobs, but adds to the DecoratedFlow only those that have not already been assigned to another host. This should both prevent the error "already belongs to another flow" and provide the correct order in the hosts attribute.

In addition, I handled the case of the Flow Makers, so that the name attribute is taken into account in an example like this:

@dataclass
class TestMaker(Maker):
    a: int
    name: str = "test_maker"

    @flow
    def make(self, b):
        j = add(self.a, b)
        return j.output

As mentioned above, I think these functionalities will be interesting to have, but I don't know if it is strictly necessary. At this point I think it is up to @utf to decide if it is important to cover all the cases (with the suggestion in my branch or any other solution) or to go on with the current implementation in this PR.

@vineetbansal
Copy link
Author

vineetbansal commented Dec 3, 2025

@gpetretto - thanks for the detailed feedback and the fixes for the more involved tests. Yes - your fix regarding having the context as a list instead of a single Flow and only adding jobs that have their host as None makes sense, and still allows all our tests in our quacc repo to pass (I tested against this branch).

Thanks also for the fix for the Makers.

For cases like these:

  @flow
  def my_flow(a, b):
      return [add(a, a), add(b, b)]

The intention is simply to construct the Jobs that go inside the @flow, and the [add(a, a), add(b, b)] part (or that anything is returned really) is just an excuse to construct the appropriate Jobs. We're able to handle this (and more complicated cases of @flow functions returning list/dicts etc). in our code, so it doesn't have to go in the jobflow.

For the one minor thing that you're flexible on:

However, I do not have a strong opinion on this matter. If the replacing of the output is left as is, I would at least suggest to explicitly mention in the warning message that the output has been replaced.

Inclusion of the following snippet as I've done in my latest commit to this branch (after merging your changes), and being explicit about the automatic replacement:

        if isinstance(output, (jobflow.Job, jobflow.Flow)):
            warnings.warn(
                f"@flow decorated function '{name}' contains a Flow or"
                f"Job as an output. Usually the output should be the output of"
                f"a Job or another Flow (e.g. job.output). Replacing the"
                f"output of the @flow with the output of the Flow/Job."
                f"If this message is unexpected then double check the outputs"
                f"of your @flow decorated function.",
                stacklevel=2,
            )
            output = output.output

This will allow our "recipes" to work seamlessly with jobflow in addition to prefect/parsl etc, (because the latter workflow engines make no distinction between a job and its output), without a lot of surgery, so hopefully this is still okay.

If you're satisfied with these changes, then perhaps we prod @utf to merge this? I'd be happy to add any additional docstrings/testcases of course, but otherwise I think we're there now..

@gpetretto
Copy link
Contributor

Hi @vineetbansal, thanks for checking the changes and including them in the PR.

The intention is simply to construct the Jobs that go inside the @flow, and the [add(a, a), add(b, b)] part (or that anything is returned really) is just an excuse to construct the appropriate Jobs. We're able to handle this (and more complicated cases of @flow functions returning list/dicts etc). in our code, so it doesn't have to go in the jobflow.

I just have a doubt about this point. Since the resolution of the references is done inside jobflow, how do you deal with its inconsistencies inside quacc, if that is sitting on top of jobflow? Do you actually change the Flow output there?

If you would rather keep the replacement of the output, I wonder if it may be worth to recursively explore the output object (similarly to what is done here)
and replace all the instances of Job and Flow with their outputs.

If you're satisfied with these changes, then perhaps we prod @utf to merge this? I'd be happy to add any additional docstrings/testcases of course, but otherwise I think we're there now..

I don't have further comments and I think the code is at a good stage. I agree that this can probably be merged, so @utf can review the changes.
If everything is fine with him, maybe a small section in the documentation can be added, so that other users can discover the new functionality?

@vineetbansal
Copy link
Author

Thanks @gpetretto. I've tweaked certain docstrings now that what goes inside the flow context is a list of Job/Flow objects, and not the decorated job.

For your final doubt:

I just have a doubt about this point. Since the resolution of the references is done inside jobflow, how do you deal with its inconsistencies inside quacc, if that is sitting on top of jobflow? Do you actually change the Flow output there?

In response to my comment:

The intention is simply to construct the Jobs that go inside the @flow, and the [add(a, a), add(b, b)] part (or that anything is returned really) is just an excuse to construct the appropriate Jobs. We're able to handle this (and more complicated cases of @flow functions returning list/dicts etc). in our code, so it doesn't have to go in the jobflow.

It turns out that I was wrong, and though this was a patch initially, we're no longer manipulating any output from @flows in quacc anymore, so I should have just said:

The intention is simply to construct the Jobs that go inside the @flow, and the [add(a, a), add(b, b)] part (or that anything is returned really) is just an excuse to construct the appropriate Jobs.

The returning of a single Job or Flow is indeed a special case we were hoping to support, and the minor accommodation of resolving it to its .output as we've done here is all we need.

@utf - This is ready for your review. Despite the detailed back and forth between me and @gpetretto , you may find that the actual code changes are surprisingly minimal. The tests should cover almost all cases that we're hoping to support with a @flow decorator.

@Andrew-S-Rosen
Copy link
Member

There is one thing not captured here that I will highlight. If one wants to make a dynamic subflow, there is still a need to call Response(replace=). It might be possible to improve this syntax as well: #416 (reply in thread)

@gpetretto
Copy link
Contributor

Hi @Andrew-S-Rosen,

to be fair I would say that I find rather confusing the notation

@flow
@job
def dynamic_job(maker):

At first sight I do not believe its meaning is obvious. I am also not sure how well this fits in the current @flow implementation.
Wouldn't this change be more related to the @job decorator rather than to @flow? Maybe with an argument, like @job(replace=True), or with a specific decorator @replace to combine with @job

@replace
@job
def dynamic_job(maker):

I think this may have some benefits:

  • similar options could be also used to handle addition and detour
  • It will not mess with the implementation of @flow, so it should be easier to implement
  • at least in my opinion, the effect of such options would be clearer.

There is also a completely different alternative. Right now if a Job returns an instance of Job or Flow, the code prints a warning, but leaves things as they are, using the Job object as output (which I cannot think of a use case for). We could change the current implementation so that if the output is an instance of a Job or Flow the code will automatically interpret this as a replace. This would not even require additional objects or notations. The downside is that this would be a backward incompatible change, although it seems unlikely that anyone would have a Flow returned from their jobs now.

@Andrew-S-Rosen
Copy link
Member

@gpetretto Yes, I agree the stacked @flow/@job decorator is confusing (Alex was probably drawing parallels with Covalent, which had done that). But I do not think the syntax is the main point; after all, there was also a suggestion to just define a new decorator called @subflow, which is also something that other workflow engines have for dynamic workflows (in Parsl, it is @join_app, for instance). But however the syntax is, I think there could be value here towards further minimizing the number of changes to the underlying source code. I will leave this to @vineetbansal to consider, perhaps in a separate PR if there is promise, but I didn't want to lose track of the comment.

@vineetbansal
Copy link
Author

vineetbansal commented Dec 19, 2025

Thanks @gpetretto , @Andrew-S-Rosen. I played around with @gpetretto 's idea of "..could change the current implementation so that if the output is an instance of a Job or Flow the code will automatically interpret this as a replace." (+support for list of Jobs which is our common use-case), by adding a bit of logic at the top of Response.from_job_returns method:

--- a/src/jobflow/core/job.py
+++ b/src/jobflow/core/job.py
@@ -1286,6 +1286,12 @@ class Response(typing.Generic[T]):
         Response
             The job response controlling the data to store and flow execution options.
         """
+        should_replace = isinstance(job_returns, Job)
+        if isinstance(job_returns, (list, tuple)):
+            should_replace = all(isinstance(resp, Job) for resp in job_returns)
+        if should_replace:
+            job_returns = Response(replace=job_returns)
+
         if isinstance(job_returns, Response):
             if job_returns.replace is None:
                 # only apply output schema if there is no replace.

and tried it on:

@job
def add(x ,y):
    return x + y

@job
def make_list_of_3(a):
    return [a] * 3

@job
def add_distributed(list_a):
    jobs = [add(val, 1) for val in list_a]
    return jobs
    
@flow
def add_distributed_flow(a):
    job1 = make_list_of_3(a)
    job2 = add_distributed(job1.output)
    return job2.output

my_flow = add_distributed_flow(2)

and its working as expected with run_locally as well as jobflow-remote's submit_flow (both with scheduler_type: shell and scheduler_type: slurm).

If this looks promising, perhaps we can open a new PR on this, since it is unrelated to the @flow decorator? I'm sure there are edge cases I may not be thinking about.

@Andrew-S-Rosen
Copy link
Member

@vineetbansal that is excellent to hear. Yes, I think this would be great to have in a separate PR. I think that covers all the cases that @utf brought up a while ago and will collectively provide a very clean experience.

@gpetretto
Copy link
Contributor

Hi @vineetbansal, sorry for the late reply. Thanks for testing that idea! Good to see it works.
I agree that it would be better to implement this in a separate PR, since it is not strictly related to the @flow decorator and is a change in the way jobflow handles the job return.

As a minor comment, in your implementation you may want to replace isinstance(job_returns, Job) with isinstance(job_returns, Job | Flow).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants