-
Notifications
You must be signed in to change notification settings - Fork 36
A flow decorator #815
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
A flow decorator #815
Conversation
|
Also pinging @gpetretto in case he has any thoughts about what considerations might need to come into play with regards to getting this or a similar approach to work nicely by Jobflow-Remote. |
More extensive tests and cases covered.
|
At a high level, to break down what's happening:
Parallel changes would have to be made in I've added some more tests to cover what I think would be typical scenarios in using the |
|
Hi @vineetbansal and @Andrew-S-Rosen,
|
… work; dynamic flows do not work yet
|
Thanks @gpetretto - your comments were most insightful and after a few iterations of overly complicating and then simplifying things on my end, I think I've ended up at more or less the same place as you had intuited.
Indeed - my original intuition was that the
Yes - I talked to @Andrew-S-Rosen and we're not using
Yes - it seems to be. Prefect seems to be passing in the "manager" to the context to execute arbitrary code in the flow:
This should now be fixed. If I understand correctly, the current I've added a test case for this.
Good idea. Done! As it stands now, This seems to have to do with the creation of brand-new jobflow/src/jobflow/core/job.py Line 1383 in d208e46
and jobflow/src/jobflow/core/job.py Line 1409 in d208e46
These 2 are the cases that we'd like to support in jobflow to be able to use it in our project. I'll dig into this some more, unless you have some thoughts already. I didn't want to delay this reply any more! |
|
Hi @vineetbansal, thanks a lot for the updates! I will have a look at them. I just wanted to quickly clarify one point: I tried to fetch your latest changes and run the tests in |
|
@gpetretto - no - you're right. Somewhere along the line during my refactoring the |
|
@gpetretto - when you're looking at this PR, I wanted to bring something else to your attention as well, as I was going through its possible integration with quacc. This is again going back to your question:
In the case of classic flows, the caller who creates the or, the same thing but during However, in the case of a The latter is the use case where we use We have a small tweak to this PR here that allows us to do this. Perhaps we can incorporate something like that into this PR (either now or as a separate PR?) The way I envision it working would be (in case you do want to preserve compatibility with This also allows us to support cases where the output is not a single |
|
Hi @vineetbansal, thanks for your additional comments. @flow
def workflow():
job1 = add(1, 2)
job2 = mult(job1.output, 3)
return job2.output # or `return job`
f = workflow()
response = run_locally(f, ensure_success=True)
assert response[f.output.uuid][1].output == 9
assert response[f.jobs[1].uuid][1].output == 9If you instead consider necessary to have the --- a/src/jobflow/managers/local.py
+++ b/src/jobflow/managers/local.py
@@ -22,6 +22,7 @@ def run_locally(
ensure_success: bool = False,
allow_external_references: bool = False,
raise_immediately: bool = False,
+ return_flow_result: bool = False,
) -> dict[str, dict[int, jobflow.Response]]:
"""
Run a :obj:`Job` or :obj:`Flow` locally.
@@ -84,6 +85,9 @@ def run_locally(
flow = get_flow(flow, allow_external_references=allow_external_references)
+ if return_flow_result and flow.output is None:
+ raise Value("The Flow should have an output if return_flow_result is True")
+
stopped_parents: set[str] = set()
errored: set[str] = set()
responses: dict[str, dict[int, jobflow.Response]] = defaultdict(dict)
@@ -183,4 +187,7 @@ def run_locally(
if ensure_success and not finished_successfully:
raise RuntimeError("Flow did not finish running successfully")
+ if return_flow_result:
+ max_out_ind = max(responses[flow.output.uuid].keys())
+ return responses[flow.output.uuid][max_out_ind].output
return dict(responses)With these changes you can just run the same for both job1 = add(1, 2)
job2 = mult(job1.output, 3)
f = Flow([job1, job2], output=job2.output)
responses = run_locally(f, ensure_success=True, return_flow_result=True)
assert responses == 9
@flow
def workflow():
job1 = add(1, 2)
job2 = mult(job1.output, 3)
return job2.output # or `return job`
f = workflow()
response = run_locally(f, ensure_success=True, return_flow_result=True)
assert responses == 9I did not test this with more complicated workflows or indexes larger than 1, so maybe some adaptation might be needed. Does this solve your issue? If not I think it would be better if you can link an example from quacc so that it is clearer where this is actually used. Also, note that all these solutions will only work when executing the workflows with Comments on the implementationI also went through your last changes. I think it is a very good solution, as it may not require any change at all for jobflow-remote. I tried to submit a simple @job
def add(a, b):
return a + b
@dataclass
class AddFlowMaker(Maker):
def make(self, a, b):
j1 = add(a, b)
j2 = add(j1.output, 2)
return Flow([j1, j2])
@flow
def add_combine(a, b):
j = add(a, b)
f = AddFlowMaker().make(1, j.output)
return f
f = add_combine(1, 2)
result = run_locally(f, ensure_success=True)The reason is that in this case the code tries to assign the same Job to multiple flows as if it was its first flow addition. I have made a change that makes the code above run and seems to keep consistency with the expected behaviour, but more investigation might be needed here as well: --- a/src/jobflow/core/flow.py
+++ b/src/jobflow/core/flow.py
@@ -827,10 +828,13 @@ class Flow(MSONable):
job_ids = set(self.all_uuids)
hosts = [self.uuid, *self.hosts]
for job in jobs:
- if job.host is not None and job.host != self.uuid:
+ flow_context = _current_flow_context.get()
+
+ if job.host is not None and job.host != self.uuid and (flow_context is not None and flow_context.uuid != job.host):
+ #if job.host is not None and job.host != self.uuid:
raise ValueError(
f"{type(job).__name__} {job.name} ({job.uuid}) already belongs "
f"to another flow."
)
if job.uuid in job_ids:
raise ValueError(
@@ -845,7 +849,8 @@ class Flow(MSONable):
)
job_ids.add(job.uuid)
if job.host != self.uuid:
- job.add_hosts_uuids(hosts)
+ prepend = flow_context is not None and flow_context.uuid == job.host
+ job.add_hosts_uuids(hosts, prepend=prepend)
self._jobs += tuple(jobs)
def remove_jobs(self, indices: int | list[int]):Do you think this will be compatible in general with your solution? Thanks for all the efforts! |
|
Just adding a note for @vineetbansal that the |
Just one additional clarification here: the examples in the link are all Jobs Makers. I did not test them, but I suppose they will work, since basically it is just a convenient way for defining the Job, but it still uses the @flow
def add_combine(a, b):
j = add(a, b)
f1 = Flow(j, j.output)
return add_single(f1.output, 3)
f = add_combine(1, 2)
result = run_locally(f, ensure_success=True)This is basically equivalent to the code with the Maker. The Maker in the example of the previous post is just including the creation of the |
|
Thanks @gpetretto - this all makes sense.
Fair point - this was a brain-freeze on my end! The Thanks also for the changes you suggest to I've added one last minor tweak in the latest commit - I've removed the insistence on the output of the Even if the return value of those functions are not returned directly but in a dict (which is fine), these In these cases, For the other failure mode that you found, I've added an xfail test for that - jobflow/tests/core/test_flow.py Line 130 in d208e46
Handling this failure mode is not a blocker for us at this stage, but let me know if you'd rather get it in with this PR, and I can investigate and incorporate it as well. However, with this latest commit, I think we have all we need to use this flow decorator in quacc. |
|
Hi @vineetbansal, Return objectsIn the example that you proposed:
Is this the correct approach? Shouldn't the return be @flow
def add_single_list(a, b):
return [add(a, 2), add(b, 3)]
@flow
def add_combine(a, b):
f1 = add_single_list(a, b)
return add_single_list(f1.output[0], 3)
f = add_combine(1, 2)
result = run_locally(f, ensure_success=True)Instead it works if the return of the first Combining with makersSorry, my previous condition was partially incorrect. I found a better one but this still left inconsitencies in the order of the So, I took the liberty of making a somewhat larger change to the procedure in which the jobs are added to the decorated Flow. It seems that this could handle all the previous cases, plus all possible usage of nested I have pushed it in this branch: https://github.com/gpetretto/jobflow/tree/test_flow_dec The main change here is that instead of calling In addition, I handled the case of the Flow Makers, so that the @dataclass
class TestMaker(Maker):
a: int
name: str = "test_maker"
@flow
def make(self, b):
j = add(self.a, b)
return j.outputAs mentioned above, I think these functionalities will be interesting to have, but I don't know if it is strictly necessary. At this point I think it is up to @utf to decide if it is important to cover all the cases (with the suggestion in my branch or any other solution) or to go on with the current implementation in this PR. |
…ts .output, with an explicit message about the replacement
|
@gpetretto - thanks for the detailed feedback and the fixes for the more involved tests. Yes - your fix regarding having the context as a list instead of a single Thanks also for the fix for the For cases like these: The intention is simply to construct the For the one minor thing that you're flexible on:
Inclusion of the following snippet as I've done in my latest commit to this branch (after merging your changes), and being explicit about the automatic replacement: This will allow our "recipes" to work seamlessly with If you're satisfied with these changes, then perhaps we prod @utf to merge this? I'd be happy to add any additional docstrings/testcases of course, but otherwise I think we're there now.. |
|
Hi @vineetbansal, thanks for checking the changes and including them in the PR.
I just have a doubt about this point. Since the resolution of the references is done inside If you would rather keep the replacement of the output, I wonder if it may be worth to recursively explore the output object (similarly to what is done here)
I don't have further comments and I think the code is at a good stage. I agree that this can probably be merged, so @utf can review the changes. |
This reverts commit 30b06c1.
…d to the context as a list
|
Thanks @gpetretto. I've tweaked certain docstrings now that what goes inside the flow context is a list of Job/Flow objects, and not the decorated job. For your final doubt:
In response to my comment:
It turns out that I was wrong, and though this was a patch initially, we're no longer manipulating any output from
The returning of a single @utf - This is ready for your review. Despite the detailed back and forth between me and @gpetretto , you may find that the actual code changes are surprisingly minimal. The tests should cover almost all cases that we're hoping to support with a |
|
There is one thing not captured here that I will highlight. If one wants to make a dynamic subflow, there is still a need to call Response(replace=). It might be possible to improve this syntax as well: #416 (reply in thread) |
|
Hi @Andrew-S-Rosen, to be fair I would say that I find rather confusing the notation @flow
@job
def dynamic_job(maker):At first sight I do not believe its meaning is obvious. I am also not sure how well this fits in the current @replace
@job
def dynamic_job(maker):I think this may have some benefits:
There is also a completely different alternative. Right now if a Job returns an instance of |
|
@gpetretto Yes, I agree the stacked |
|
Thanks @gpetretto , @Andrew-S-Rosen. I played around with @gpetretto 's idea of "..could change the current implementation so that if the output is an instance of a Job or Flow the code will automatically interpret this as a replace." (+support for list of and tried it on: and its working as expected with If this looks promising, perhaps we can open a new PR on this, since it is unrelated to the |
|
@vineetbansal that is excellent to hear. Yes, I think this would be great to have in a separate PR. I think that covers all the cases that @utf brought up a while ago and will collectively provide a very clean experience. |
|
Hi @vineetbansal, sorry for the late reply. Thanks for testing that idea! Good to see it works. As a minor comment, in your implementation you may want to replace |
Summary
Include a summary of major changes in bullet points:
I was talking to @Andrew-S-Rosen about implementing a
@flowdecorator forjobflow, and a previous discussion on this topic. Here's an initial stab at it. It's using a "flow context" (an idea also used in the Prefect library) so that a job can figure out if it's running inside a decorated flow function, and if so, add itself to the flow's job list. I believe this should help us get around the "updating global variables" issue as mentioned in a related PR in the linked discussion.@flowdecoratorreplacement.Additional dependencies introduced (if any)
None
TODO (if any)
I have almost no history with using
jobflow, so certainly I'm not thinking about all use cases, so any pointers (initially failing tests that I can add to cover different scenarios before tackling them) will be welcome.Checklist
Work-in-progress pull requests are encouraged, but please put [WIP] in the pull request
title.
Before a pull request can be merged, the following items must be checked:
The easiest way to handle this is to run the following in the correct sequence on
your local machine. Start with running black on your new code. This will
automatically reformat your code to PEP8 conventions and removes most issues. Then run
pycodestyle, followed by flake8.
Run pydocstyle on your code.
type check your code.
Note that the CI system will run all the above checks. But it will be much more
efficient if you already fix most errors prior to submitting the PR. It is highly
recommended that you use the pre-commit hook provided in the repository. Simply
pip install pre-commitand thenpre-commit installand a check will be runprior to allowing commits.