-
Notifications
You must be signed in to change notification settings - Fork 154
Initialize and shutdown ray session in each executor #844
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: ray-api
Are you sure you want to change the base?
Initialize and shutdown ray session in each executor #844
Conversation
…an/NeMo-Curator into praateek/test-cuda-context Signed-off-by: Praateek <praateekm@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some minor comments. Should add a bunch of TODOs but apart from that looks nice.
output_tasks: list[Task] = [] | ||
try: | ||
# Initialize ray | ||
ray.init(ignore_reinit_error=True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: Can we please add a comment here? Why are we doing this again here? Since getting the client also might have ray init/ray start.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also this should have the loguru serialiser?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This does have a loguru few lines above. Added a comment too
output_tasks = self._dataset_to_tasks(current_dataset) | ||
logger.info(f"Pipeline completed. Final results: {len(output_tasks)} tasks") | ||
finally: | ||
ray.shutdown() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above. A comment here would be useful. If you have made a issue out of our findings, we can just link that here.
@@ -113,13 +114,14 @@ def execute(self, stages: list[ProcessingStage], initial_tasks: list[Task] | Non | |||
logger.info(f"Execution mode: {exec_mode.name}") | |||
|
|||
try: | |||
# Run the pipeline | |||
# Run the pipeline (this will initialize ray) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My recommendation would be to add ray. init here as well, along with Ray Loguru serializer, so that we are not dependent on Xenna. And the call is exactly the same across executors.
If we add ray.init
here, the ray init inside Xenna will effectively be useless, and I think that is what we want?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup made that change, and in fact that helps us solve this PR without Xenna changes too (along with one more change)
from ray.cluster_utils import Cluster | ||
|
||
|
||
def find_free_port(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this function might be present in the actual code too. Can we add a TODO to use that maybe and remove this ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the reason we find the port ourselves is so that subsequently we can connect to that port using ray.init
(instead of address="auto" to avoid connecting to another cluster in case multiple are running)..
str(2 * ONE_GB), | ||
"--block", | ||
], | ||
env={**os.environ, "RAY_MAX_LIMIT_FROM_API_SERVER": "40000", "RAY_MAX_LIMIT_FROM_DATA_SOURCE": "40000"}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a TODO to use get_client here in the future?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added, i hope we can. the only nuance is that we need to know the pid of the process that was started by ray start so that we can kill it without doing ray stop
which might kill all ray processes
Signed-off-by: Praateek <praateekm@gmail.com>
Signed-off-by: Praateek <praateekm@gmail.com>
Description
This PRs goal is to ensure we are able to run different executor pipeline in the same python process e.g. Xenna pipeline followed by Ray Data pipeline (and vice versa). It is currently not possible to do so without shutdown because ray preserve envvars from the previous sessions.
If https://github.com/nvidia-cosmos/cosmos-xenna/pull/6/files does not merge, even then this PR works as expected as Xenna sets
os.environ[RAY_EXPERIMENTAL_NOSET_CUDA_VISIBLE_DEVICES]
which means if Ray Data runs after Xenna then Ray Data will not setCUDA_VISIBLE_DEVICES
i.e. usegpu_id=0
. We solve that by forcefully settingRAY_EXPERIMENTAL_NOSET_CUDA_VISIBLE_DEVICES=""
inside envvars while doing ray.init inside Ray Data.Workarounds needed
However because we introduce
ray.shutdown()
nowSee the following for clearer understanding
Usage
# Add snippet demonstrating usage
Checklist